December 23, 2011

SSIS Programming MergeJoin And Conditional Split Sample

Yesterday I finish a small pet project where I needed to move data across sql server instances without the option of staging the data. Here was my solution, The script will create a package with two sources join by a mergejoin component follow to a conditional split and a rowcount...How the package look

What The Code Generate:
Code To generate this package:
' Microsoft SQL Server Integration Services Script Task
' Write scripts using Microsoft Visual Basic
' The ScriptMain class is the entry point of the Script Task.

Imports System
Imports System.Data
Imports System.Math
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper


Public Class ScriptMain

    ' The execution engine calls this method when the task executes.
    ' To access the object model, use the Dts object. Connections, variables, events,
    ' and logging features are available as static members of the Dts class.
    ' Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
    '
    ' To open Code and Text Editor Help, press F1.
    ' To open Object Browser, press Ctrl+Alt+J.

    Public Sub Main()
        'Variables
        Dts.VariableDispenser.LockForWrite("User::int_InsertCount")
        Dts.VariableDispenser.LockForRead("User::str_SourceSqlCmd")
        Dts.VariableDispenser.LockForRead("User::str_SourceDestinationSqlCmd")
        Dts.VariableDispenser.LockForRead("User::str_SourceTable")
        Dts.VariableDispenser.LockForRead("User::str_DestinationTable")
        Dim var As Variables

        Dts.VariableDispenser.GetVariables(var)


        'Package Variables

        Dim pkg As Microsoft.SqlServer.Dts.Runtime.Package
        Dim InsertCount As Microsoft.SqlServer.Dts.Runtime.Variable
        Dim SourceSqlCmd As String
        Dim SourceDestinationSqlCmd As String
        Dim SourceTable As String
        Dim DestinationTable As String
        Dim ConditionalExpression As String
        Dim idx As Int32
        Dim FastLoadMaxCommitSize As Int32
        Dim RowsPerBatch As String
        'Logging
        Dim pkgLogging As LogProvider

        'ConnectionManager Variables
        Dim SourceConnMgr As ConnectionManager
        Dim DestinationConnMgr As ConnectionManager
        Dim MsdbConnMgr As ConnectionManager

        'Connection Strings
        Dim SourceConStr As String
        Dim DestinationStr As String
        Dim msdbStr As String


        'Executable
        Dim exec As Microsoft.SqlServer.Dts.Runtime.Executable
        Dim DataFlow As Microsoft.SqlServer.Dts.Runtime.Executable

        'Sequences
        Dim seg1 As Microsoft.SqlServer.Dts.Runtime.Sequence
        Dim seg2 As Microsoft.SqlServer.Dts.Runtime.Sequence
        Dim seg3 As Microsoft.SqlServer.Dts.Runtime.Sequence
        Dim seg4 As Microsoft.SqlServer.Dts.Runtime.Sequence

        'Paths
        Dim Source2Merge As IDTSPath90
        Dim SourceDestination2Merge As IDTSPath90
        Dim Merge2Condition As IDTSPath90
        Dim Condition2RowCount As IDTSPath90
        Dim RowCount2Destination As IDTSPath90

        'Properties
        Dim MergeJoinProperty1 As IDTSCustomProperty90
        Dim MergeJoinProperty2 As IDTSCustomProperty90

        'Component Variables
        Dim DataFlowTaskHost As Microsoft.SqlServer.Dts.Runtime.TaskHost
        Dim DataFlowTask As MainPipe
        Dim SourceComponent As IDTSComponentMetaData90
        Dim SourceDestinationComponent As IDTSComponentMetaData90
        Dim MergeJoinComponent As IDTSComponentMetaData90
        Dim ConditionalSplitComponent As IDTSComponentMetaData90
        Dim RowCountComponent As IDTSComponentMetaData90
        Dim DestinationComponent As IDTSComponentMetaData90

        'Instance Variables

        Dim SourceInstance As CManagedComponentWrapper
        Dim SourceDestinationInstance As CManagedComponentWrapper
        Dim MergeJoinInstance As CManagedComponentWrapper
        Dim ConditionalSplitInstance As CManagedComponentWrapper
        Dim RowCountInstance As CManagedComponentWrapper
        Dim DestinationInstance As CManagedComponentWrapper


        'Inputs
        Dim Input1Merge As IDTSInput90
        Dim Input2Merge As IDTSInput90
        Dim InputConditional As IDTSInput90
        Dim InputDestination As IDTSInput90

        'Outputs
        Dim OutPutMerge As IDTSOutput90
        Dim output2Merge As IDTSOutputCollection90
        Dim OutPutConditional As IDTSOutput90



        'Virtual Input Collections

        Dim DestinationVirtualInput As IDTSVirtualInput90
        Dim DestinationVirtualInputColumns As IDTSVirtualInputColumnCollection90
        Dim SourceMergeVirtualInput As IDTSVirtualInput90
        Dim SourceMergeVirtualInputColumns As IDTSVirtualInputColumnCollection90
        Dim SourceDestinationMergeVirtualInput As IDTSVirtualInput90
        Dim SourceDestinationMergeVirtualInputColumns As IDTSVirtualInputColumnCollection90
        Dim ConditionalVirtualInput As IDTSVirtualInput90
        Dim CoditionalVirtualInputColumns As IDTSVirtualInputColumnCollection90



        Try
            'Initialization
            idx = 0
            pkg = New Microsoft.SqlServer.Dts.Runtime.Package
            pkg.IsolationLevel = IsolationLevel.Serializable
            pkg.Name = "Project_merge"
            ConditionalExpression = "ISNULL(CheckId) ==  TRUE"
            InsertCount = pkg.Variables.Add("int_InsertRowCount", False, "User", 0)
            FastLoadMaxCommitSize = 10000
            RowsPerBatch = "50000"

            'General variables
            SourceSqlCmd = var("User::str_SourceSqlCmd").Value.ToString()
            SourceDestinationSqlCmd = var("User::str_SourceDestinationSqlCmd").Value.ToString()
            SourceTable = var("User::str_SourceTable").Value.ToString()
            DestinationTable = var("User::str_DestinationTable").Value.ToString()

            'ConnectionStrings
            SourceConStr = Dts.Connections("Source").ConnectionString
            DestinationStr = Dts.Connections("Destination").ConnectionString
            msdbStr = Dts.Connections("msdb").ConnectionString
            'Set Connection Manager
            SourceConnMgr = CreateOLEDBConnection(pkg, "Source", SourceConStr)
            DestinationConnMgr = CreateOLEDBConnection(pkg, "Destination", DestinationStr)
            MsdbConnMgr = CreateOLEDBConnection(pkg, "msdb", msdbStr)
            ' Add Logging
            pkgLogging = pkg.LogProviders.Add("DTS.LogProviderSQLServer.1")
            pkgLogging.ConfigString = MsdbConnMgr.Name.ToString()
            pkgLogging.Description = "Packing logging Using msdb"
            pkg.LoggingOptions.SelectedLogProviders.Add(pkgLogging)
            pkg.LoggingOptions.EventFilterKind = Microsoft.SqlServer.Dts.Runtime.DTSEventFilterKind.Inclusion
            pkg.LoggingOptions.EventFilter = New String() {"OnPreExecute", "OnPostExecute", "OnError"}
            pkg.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Disabled

            'Set Sequences
            exec = pkg.Executables.Add("STOCK:Sequence")
            seg1 = CType(exec, Microsoft.SqlServer.Dts.Runtime.Sequence)
            seg2 = CType(exec, Microsoft.SqlServer.Dts.Runtime.Sequence)
            seg3 = CType(exec, Microsoft.SqlServer.Dts.Runtime.Sequence)
            seg4 = CType(exec, Microsoft.SqlServer.Dts.Runtime.Sequence)

            seg1.Name = "SourceSequence"
            seg2.Name = "SourceDestinationSequence"
            seg3.Name = "MergeSequence"
            seg4.Name = "ConditionSequence"

            'Set dataFlow
            DataFlow = seg3.Executables.Add("STOCK:PipelineTask")
            DataFlowTaskHost = CType(DataFlow, Microsoft.SqlServer.Dts.Runtime.TaskHost)
            DataFlowTaskHost.Name = "ArchiveExportUsingMerge"
            DataFlowTask = CType(DataFlowTaskHost.InnerObject, MainPipe)

            'Add Sources
            AddComponent(SourceComponent, "Source", "DTSAdapter.OleDbSource.1", DataFlowTask)
            Instantiate(SourceInstance, SourceComponent)
            SourceComponent.Name = "Source"
            SetComponnetConnection(SourceComponent, SourceConnMgr)

            SourceInstance.SetComponentProperty("AccessMode", 2)
            SourceInstance.SetComponentProperty("SqlCommand", SourceSqlCmd)
            SourceInstance.SetComponentProperty("OpenRowset", SourceTable)

            'Reinitialize the metadata, Refresh Columns
            Refresh(SourceInstance)

            SourceComponent.OutputCollection(0).IsSorted = True
            SourceComponent.OutputCollection(0).OutputColumnCollection(0).SortKeyPosition = 1

            AddComponent(SourceDestinationComponent, "SourceDestination", "DTSAdapter.OleDbSource.1", DataFlowTask)
            Instantiate(SourceDestinationInstance, SourceDestinationComponent)
            SourceDestinationComponent.Name = "SourceDestination"
            SetComponnetConnection(SourceDestinationComponent, DestinationConnMgr)
            SourceDestinationInstance.SetComponentProperty("AccessMode", 2)
            SourceDestinationInstance.SetComponentProperty("SqlCommand", SourceDestinationSqlCmd)
            SourceDestinationInstance.SetComponentProperty("OpenRowset", DestinationTable)

            'Reinitialize the metadata, Refresh Columns
            Refresh(SourceDestinationInstance)
         

            SourceDestinationComponent.OutputCollection(0).IsSorted = True
            SourceDestinationComponent.OutputCollection(0).OutputColumnCollection(0).SortKeyPosition = 1

            'Add Merge
            AddComponent(MergeJoinComponent, "MergeJoin", "DTSTransform.MergeJoin.1", DataFlowTask)
            Instantiate(MergeJoinInstance, MergeJoinComponent)
            'Left Connection
            MergeJoinComponent.InputCollection(0).ExternalMetadataColumnCollection.IsUsed = False
            MergeJoinComponent.InputCollection(0).HasSideEffects = False
            'Right Connection
            MergeJoinComponent.InputCollection(1).ExternalMetadataColumnCollection.IsUsed = False
            MergeJoinComponent.InputCollection(1).HasSideEffects = False

            'Join Source and MergeJoin
            Source2Merge = DataFlowTask.PathCollection.[New]
            Source2Merge.AttachPathAndPropagateNotifications(SourceComponent.OutputCollection(0), MergeJoinComponent.InputCollection(0))
            SourceDestination2Merge = DataFlowTask.PathCollection.[New]
            SourceDestination2Merge.AttachPathAndPropagateNotifications(SourceDestinationComponent.OutputCollection(0), MergeJoinComponent.InputCollection(1))

            'Mappings
            Input1Merge = MergeJoinComponent.InputCollection(0)
            SourceMergeVirtualInput = Input1Merge.GetVirtualInput()
            Input2Merge = MergeJoinComponent.InputCollection(1)
            SourceDestinationMergeVirtualInput = Input2Merge.GetVirtualInput()

            ' Input 1
            For Each vcolumn As IDTSVirtualInputColumn90 In SourceMergeVirtualInput.VirtualInputColumnCollection
                Dim inputColumn As IDTSInputColumn90 = MergeJoinInstance.SetUsageType(Input1Merge.ID, SourceMergeVirtualInput, vcolumn.LineageID, DTSUsageType.UT_READONLY)
            Next
            'input 2
            For Each vcolumn As IDTSVirtualInputColumn90 In SourceDestinationMergeVirtualInput.VirtualInputColumnCollection
                Dim inputColumn As IDTSInputColumn90 = MergeJoinInstance.SetUsageType(Input2Merge.ID, SourceDestinationMergeVirtualInput, vcolumn.LineageID, DTSUsageType.UT_READONLY)

            Next

            MergeJoinProperty1 = MergeJoinComponent.CustomPropertyCollection(0)
            MergeJoinProperty1.Value = 1

            MergeJoinProperty2 = MergeJoinComponent.CustomPropertyCollection(1)
            MergeJoinProperty2.Value = 1

            ' Give Alias to Pk Column
            OutPutMerge = MergeJoinComponent.OutputCollection(0)
            idx = OutPutMerge.OutputColumnCollection.Count - 1
            OutPutMerge.OutputColumnCollection(idx).Name = "CheckId"

            'Reinitialized metadata, refresh
            Refresh(MergeJoinInstance)
        

            ' Add Conditional split
            AddComponent(ConditionalSplitComponent, "ConditionalSplit", "DTSTransform.ConditionalSplit.1", DataFlowTask)
            Instantiate(ConditionalSplitInstance, ConditionalSplitComponent)

            'Reinitialized metadata, refresh
            Refresh(ConditionalSplitInstance)
         


            Merge2Condition = DataFlowTask.PathCollection.[New]
            Merge2Condition.AttachPathAndPropagateNotifications(MergeJoinComponent.OutputCollection(0), ConditionalSplitComponent.InputCollection(0))


            OutPutConditional = ConditionalSplitInstance.InsertOutput(DTSInsertPlacement.IP_BEFORE, ConditionalSplitComponent.OutputCollection(0).ID)
            OutPutConditional.Name = "Inserts"
            OutPutConditional.Description = "Inserts New Records"

            InputConditional = ConditionalSplitComponent.InputCollection(0)
            ConditionalVirtualInput = InputConditional.GetVirtualInput()

            CoditionalVirtualInputColumns = ConditionalVirtualInput.VirtualInputColumnCollection


            For Each vColumn As IDTSVirtualInputColumn90 In ConditionalVirtualInput.VirtualInputColumnCollection
                If vColumn.Name = "CheckId" Then
                    Dim col As IDTSInputColumn90 = ConditionalSplitInstance.SetUsageType(InputConditional.ID, ConditionalVirtualInput, vColumn.LineageID, DTSUsageType.UT_READONLY)
                    ConditionalSplitInstance.SetOutputProperty(OutPutConditional.ID, "FriendlyExpression", ConditionalExpression)
                End If

            Next
            'Add Row Count Component
            AddComponent(RowCountComponent, "InsertRowCount", "DTSTransform.RowCount.1", DataFlowTask)
            Instantiate(RowCountInstance, RowCountComponent)
            RowCountInstance.SetComponentProperty("VariableName", "User::int_InsertRowCount")

            Refresh(RowCountInstance)
      

            Condition2RowCount = DataFlowTask.PathCollection.[New]
            Condition2RowCount.AttachPathAndPropagateNotifications(OutPutConditional, RowCountComponent.InputCollection(0))

            'Add Destination Component
            AddComponent(DestinationComponent, "Destination", "DTSAdapter.OLEDBDestination.1", DataFlowTask)
            Instantiate(DestinationInstance, DestinationComponent)
            DestinationComponent.Name = "Destination"
            DestinationComponent.Description = "Destination Table"
            DestinationComponent.ValidateExternalMetadata = True
            DestinationComponent.RuntimeConnectionCollection(0).ConnectionManagerID = DestinationConnMgr.ID
            DestinationComponent.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.ToConnectionManager90(DestinationConnMgr)


     

            'Set AccessMode
            DestinationInstance.SetComponentProperty("AccessMode", 3)
            DestinationInstance.SetComponentProperty("OpenRowset", DestinationTable)

            DestinationInstance.SetComponentProperty("FastLoadKeepIdentity", True)
            DestinationInstance.SetComponentProperty("FastLoadMaxInsertCommitSize", FastLoadMaxCommitSize)
            DestinationInstance.SetComponentProperty("FastLoadKeepNulls", False)
            DestinationInstance.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = " + RowsPerBatch)
            DestinationInstance.Validate()

            Refresh(DestinationInstance)
        


            'Connect Destination and RowCount
            RowCount2Destination = DataFlowTask.PathCollection.[New]
            RowCount2Destination.AttachPathAndPropagateNotifications(RowCountComponent.OutputCollection(0), DestinationComponent.InputCollection(0))

            'Mappings
            InputDestination = DestinationComponent.InputCollection(0)
            DestinationVirtualInput = InputDestination.GetVirtualInput
            DestinationVirtualInputColumns = CType(DestinationVirtualInput.VirtualInputColumnCollection, IDTSVirtualInputColumnCollection90)


            'Mapping
            For Each vcolumn As IDTSVirtualInputColumn90 In DestinationVirtualInput.VirtualInputColumnCollection
                If vcolumn.Name = "CheckId" Then
                    Dim inputColumn As IDTSInputColumn90 = DestinationInstance.SetUsageType(InputDestination.ID, DestinationVirtualInput, vcolumn.LineageID, DTSUsageType.UT_IGNORED)
                    ' Dim externalColumn As IDTSExternalMetadataColumn90 = InputDestination.ExternalMetadataColumnCollection(inputColumn.Name)
                    ' DestinationInstance.MapInputColumn(InputDestination.ID, inputColumn.ID, externalColumn.ID)
                Else
                    Dim inputColumn As IDTSInputColumn90 = DestinationInstance.SetUsageType(InputDestination.ID, DestinationVirtualInput, vcolumn.LineageID, DTSUsageType.UT_READONLY)
                    Dim externalColumn As IDTSExternalMetadataColumn90 = InputDestination.ExternalMetadataColumnCollection(inputColumn.Name)
                    DestinationInstance.MapInputColumn(InputDestination.ID, inputColumn.ID, externalColumn.ID)
                End If



            Next


            'Save Package
            Dim App As Microsoft.SqlServer.Dts.Runtime.Application = New Microsoft.SqlServer.Dts.Runtime.Application
            App.SaveToXml(String.Format("C:\Temp\ssis_" + pkg.Name.ToString() + "_" + Format(Now(), "MMddyyyyss") + ".dtsx", pkg.Name), pkg, Nothing)

            pkg.Execute()
            Dts.TaskResult = Dts.Results.Success


        Catch ex As Exception
        'Save Package
            Dim App As Microsoft.SqlServer.Dts.Runtime.Application = New Microsoft.SqlServer.Dts.Runtime.Application
            App.SaveToXml(String.Format("C:\Temp\ssis_Error_" + pkg.Name.ToString() + "_" + Format(Now(), "MMddyyyyss") + ".dtsx", pkg.Name), pkg, Nothing)
            Dts.TaskResult = Dts.Results.Failure
        End Try





    End Sub
    Private Sub AddComponent(ByRef c As IDTSComponentMetaData90, ByVal Name As String, ByVal ComponentClassId As String, ByRef dft As MainPipe)

        c = dft.ComponentMetaDataCollection.[New]
        c.Name = Name
        c.ComponentClassID = ComponentClassId

    End Sub
    Private Sub Instantiate(ByRef i As CManagedComponentWrapper, ByRef c As IDTSComponentMetaData90)
        i = c.Instantiate
        i.ProvideComponentProperties()

    End Sub
    Private Sub SetComponnetConnection(ByRef c As IDTSComponentMetaData90, ByVal CMgr As ConnectionManager)
        c.RuntimeConnectionCollection(0).ConnectionManagerID = CMgr.ID
        c.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.ToConnectionManager90(CMgr)

    End Sub

    Private Function CreateOLEDBConnection(ByVal p As Microsoft.SqlServer.Dts.Runtime.Package, _
                                       ByVal ConName As String, ByVal ConStr As String) As ConnectionManager
        Dim ConMgr As ConnectionManager = p.Connections.Add("OLEDB")
        ConMgr.ConnectionString = ConStr
        ConMgr.Name = ConName
        ConMgr.Description = "SQL OLE DB Using " + ConName + " Connection"
        Return ConMgr
    End Function
    Private Sub Refresh(ByRef i As CManagedComponentWrapper)
        i.AcquireConnections(Nothing)
        i.ReinitializeMetaData()
        i.ReleaseConnections()
    End Sub


End Class

1 comment:

Contact Form

Name

Email *

Message *