Yesterday I finish a small pet project where I needed to move data across sql server instances without the option of staging the data. Here was my solution, The script will create a package with two sources join by a mergejoin component follow to a conditional split and a rowcount...How the package look
What The Code Generate:
Code To generate this package:
' Microsoft SQL Server Integration Services Script Task
' Write scripts using Microsoft Visual Basic
' The ScriptMain class is the entry point of the Script Task.
Imports System
Imports System.Data
Imports System.Math
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Public Class ScriptMain
' The execution engine calls this method when the task executes.
' To access the object model, use the Dts object. Connections, variables, events,
' and logging features are available as static members of the Dts class.
' Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
'
' To open Code and Text Editor Help, press F1.
' To open Object Browser, press Ctrl+Alt+J.
Public Sub Main()
'Variables
Dts.VariableDispenser.LockForWrite("User::int_InsertCount")
Dts.VariableDispenser.LockForRead("User::str_SourceSqlCmd")
Dts.VariableDispenser.LockForRead("User::str_SourceDestinationSqlCmd")
Dts.VariableDispenser.LockForRead("User::str_SourceTable")
Dts.VariableDispenser.LockForRead("User::str_DestinationTable")
Dim var As Variables
Dts.VariableDispenser.GetVariables(var)
'Package Variables
Dim pkg As Microsoft.SqlServer.Dts.Runtime.Package
Dim InsertCount As Microsoft.SqlServer.Dts.Runtime.Variable
Dim SourceSqlCmd As String
Dim SourceDestinationSqlCmd As String
Dim SourceTable As String
Dim DestinationTable As String
Dim ConditionalExpression As String
Dim idx As Int32
Dim FastLoadMaxCommitSize As Int32
Dim RowsPerBatch As String
'Logging
Dim pkgLogging As LogProvider
'ConnectionManager Variables
Dim SourceConnMgr As ConnectionManager
Dim DestinationConnMgr As ConnectionManager
Dim MsdbConnMgr As ConnectionManager
'Connection Strings
Dim SourceConStr As String
Dim DestinationStr As String
Dim msdbStr As String
'Executable
Dim exec As Microsoft.SqlServer.Dts.Runtime.Executable
Dim DataFlow As Microsoft.SqlServer.Dts.Runtime.Executable
'Sequences
Dim seg1 As Microsoft.SqlServer.Dts.Runtime.Sequence
Dim seg2 As Microsoft.SqlServer.Dts.Runtime.Sequence
Dim seg3 As Microsoft.SqlServer.Dts.Runtime.Sequence
Dim seg4 As Microsoft.SqlServer.Dts.Runtime.Sequence
'Paths
Dim Source2Merge As IDTSPath90
Dim SourceDestination2Merge As IDTSPath90
Dim Merge2Condition As IDTSPath90
Dim Condition2RowCount As IDTSPath90
Dim RowCount2Destination As IDTSPath90
'Properties
Dim MergeJoinProperty1 As IDTSCustomProperty90
Dim MergeJoinProperty2 As IDTSCustomProperty90
'Component Variables
Dim DataFlowTaskHost As Microsoft.SqlServer.Dts.Runtime.TaskHost
Dim DataFlowTask As MainPipe
Dim SourceComponent As IDTSComponentMetaData90
Dim SourceDestinationComponent As IDTSComponentMetaData90
Dim MergeJoinComponent As IDTSComponentMetaData90
Dim ConditionalSplitComponent As IDTSComponentMetaData90
Dim RowCountComponent As IDTSComponentMetaData90
Dim DestinationComponent As IDTSComponentMetaData90
'Instance Variables
Dim SourceInstance As CManagedComponentWrapper
Dim SourceDestinationInstance As CManagedComponentWrapper
Dim MergeJoinInstance As CManagedComponentWrapper
Dim ConditionalSplitInstance As CManagedComponentWrapper
Dim RowCountInstance As CManagedComponentWrapper
Dim DestinationInstance As CManagedComponentWrapper
'Inputs
Dim Input1Merge As IDTSInput90
Dim Input2Merge As IDTSInput90
Dim InputConditional As IDTSInput90
Dim InputDestination As IDTSInput90
'Outputs
Dim OutPutMerge As IDTSOutput90
Dim output2Merge As IDTSOutputCollection90
Dim OutPutConditional As IDTSOutput90
'Virtual Input Collections
Dim DestinationVirtualInput As IDTSVirtualInput90
Dim DestinationVirtualInputColumns As IDTSVirtualInputColumnCollection90
Dim SourceMergeVirtualInput As IDTSVirtualInput90
Dim SourceMergeVirtualInputColumns As IDTSVirtualInputColumnCollection90
Dim SourceDestinationMergeVirtualInput As IDTSVirtualInput90
Dim SourceDestinationMergeVirtualInputColumns As IDTSVirtualInputColumnCollection90
Dim ConditionalVirtualInput As IDTSVirtualInput90
Dim CoditionalVirtualInputColumns As IDTSVirtualInputColumnCollection90
Try
'Initialization
idx = 0
pkg = New Microsoft.SqlServer.Dts.Runtime.Package
pkg.IsolationLevel = IsolationLevel.Serializable
pkg.Name = "Project_merge"
ConditionalExpression = "ISNULL(CheckId) == TRUE"
InsertCount = pkg.Variables.Add("int_InsertRowCount", False, "User", 0)
FastLoadMaxCommitSize = 10000
RowsPerBatch = "50000"
'General variables
SourceSqlCmd = var("User::str_SourceSqlCmd").Value.ToString()
SourceDestinationSqlCmd = var("User::str_SourceDestinationSqlCmd").Value.ToString()
SourceTable = var("User::str_SourceTable").Value.ToString()
DestinationTable = var("User::str_DestinationTable").Value.ToString()
'ConnectionStrings
SourceConStr = Dts.Connections("Source").ConnectionString
DestinationStr = Dts.Connections("Destination").ConnectionString
msdbStr = Dts.Connections("msdb").ConnectionString
'Set Connection Manager
SourceConnMgr = CreateOLEDBConnection(pkg, "Source", SourceConStr)
DestinationConnMgr = CreateOLEDBConnection(pkg, "Destination", DestinationStr)
MsdbConnMgr = CreateOLEDBConnection(pkg, "msdb", msdbStr)
' Add Logging
pkgLogging = pkg.LogProviders.Add("DTS.LogProviderSQLServer.1")
pkgLogging.ConfigString = MsdbConnMgr.Name.ToString()
pkgLogging.Description = "Packing logging Using msdb"
pkg.LoggingOptions.SelectedLogProviders.Add(pkgLogging)
pkg.LoggingOptions.EventFilterKind = Microsoft.SqlServer.Dts.Runtime.DTSEventFilterKind.Inclusion
pkg.LoggingOptions.EventFilter = New String() {"OnPreExecute", "OnPostExecute", "OnError"}
pkg.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Disabled
'Set Sequences
exec = pkg.Executables.Add("STOCK:Sequence")
seg1 = CType(exec, Microsoft.SqlServer.Dts.Runtime.Sequence)
seg2 = CType(exec, Microsoft.SqlServer.Dts.Runtime.Sequence)
seg3 = CType(exec, Microsoft.SqlServer.Dts.Runtime.Sequence)
seg4 = CType(exec, Microsoft.SqlServer.Dts.Runtime.Sequence)
seg1.Name = "SourceSequence"
seg2.Name = "SourceDestinationSequence"
seg3.Name = "MergeSequence"
seg4.Name = "ConditionSequence"
'Set dataFlow
DataFlow = seg3.Executables.Add("STOCK:PipelineTask")
DataFlowTaskHost = CType(DataFlow, Microsoft.SqlServer.Dts.Runtime.TaskHost)
DataFlowTaskHost.Name = "ArchiveExportUsingMerge"
DataFlowTask = CType(DataFlowTaskHost.InnerObject, MainPipe)
'Add Sources
AddComponent(SourceComponent, "Source", "DTSAdapter.OleDbSource.1", DataFlowTask)
Instantiate(SourceInstance, SourceComponent)
SourceComponent.Name = "Source"
SetComponnetConnection(SourceComponent, SourceConnMgr)
SourceInstance.SetComponentProperty("AccessMode", 2)
SourceInstance.SetComponentProperty("SqlCommand", SourceSqlCmd)
SourceInstance.SetComponentProperty("OpenRowset", SourceTable)
'Reinitialize the metadata, Refresh Columns
Refresh(SourceInstance)
SourceComponent.OutputCollection(0).IsSorted = True
SourceComponent.OutputCollection(0).OutputColumnCollection(0).SortKeyPosition = 1
AddComponent(SourceDestinationComponent, "SourceDestination", "DTSAdapter.OleDbSource.1", DataFlowTask)
Instantiate(SourceDestinationInstance, SourceDestinationComponent)
SourceDestinationComponent.Name = "SourceDestination"
SetComponnetConnection(SourceDestinationComponent, DestinationConnMgr)
SourceDestinationInstance.SetComponentProperty("AccessMode", 2)
SourceDestinationInstance.SetComponentProperty("SqlCommand", SourceDestinationSqlCmd)
SourceDestinationInstance.SetComponentProperty("OpenRowset", DestinationTable)
'Reinitialize the metadata, Refresh Columns
Refresh(SourceDestinationInstance)
SourceDestinationComponent.OutputCollection(0).IsSorted = True
SourceDestinationComponent.OutputCollection(0).OutputColumnCollection(0).SortKeyPosition = 1
'Add Merge
AddComponent(MergeJoinComponent, "MergeJoin", "DTSTransform.MergeJoin.1", DataFlowTask)
Instantiate(MergeJoinInstance, MergeJoinComponent)
'Left Connection
MergeJoinComponent.InputCollection(0).ExternalMetadataColumnCollection.IsUsed = False
MergeJoinComponent.InputCollection(0).HasSideEffects = False
'Right Connection
MergeJoinComponent.InputCollection(1).ExternalMetadataColumnCollection.IsUsed = False
MergeJoinComponent.InputCollection(1).HasSideEffects = False
'Join Source and MergeJoin
Source2Merge = DataFlowTask.PathCollection.[New]
Source2Merge.AttachPathAndPropagateNotifications(SourceComponent.OutputCollection(0), MergeJoinComponent.InputCollection(0))
SourceDestination2Merge = DataFlowTask.PathCollection.[New]
SourceDestination2Merge.AttachPathAndPropagateNotifications(SourceDestinationComponent.OutputCollection(0), MergeJoinComponent.InputCollection(1))
'Mappings
Input1Merge = MergeJoinComponent.InputCollection(0)
SourceMergeVirtualInput = Input1Merge.GetVirtualInput()
Input2Merge = MergeJoinComponent.InputCollection(1)
SourceDestinationMergeVirtualInput = Input2Merge.GetVirtualInput()
' Input 1
For Each vcolumn As IDTSVirtualInputColumn90 In SourceMergeVirtualInput.VirtualInputColumnCollection
Dim inputColumn As IDTSInputColumn90 = MergeJoinInstance.SetUsageType(Input1Merge.ID, SourceMergeVirtualInput, vcolumn.LineageID, DTSUsageType.UT_READONLY)
Next
'input 2
For Each vcolumn As IDTSVirtualInputColumn90 In SourceDestinationMergeVirtualInput.VirtualInputColumnCollection
Dim inputColumn As IDTSInputColumn90 = MergeJoinInstance.SetUsageType(Input2Merge.ID, SourceDestinationMergeVirtualInput, vcolumn.LineageID, DTSUsageType.UT_READONLY)
Next
MergeJoinProperty1 = MergeJoinComponent.CustomPropertyCollection(0)
MergeJoinProperty1.Value = 1
MergeJoinProperty2 = MergeJoinComponent.CustomPropertyCollection(1)
MergeJoinProperty2.Value = 1
' Give Alias to Pk Column
OutPutMerge = MergeJoinComponent.OutputCollection(0)
idx = OutPutMerge.OutputColumnCollection.Count - 1
OutPutMerge.OutputColumnCollection(idx).Name = "CheckId"
'Reinitialized metadata, refresh
Refresh(MergeJoinInstance)
' Add Conditional split
AddComponent(ConditionalSplitComponent, "ConditionalSplit", "DTSTransform.ConditionalSplit.1", DataFlowTask)
Instantiate(ConditionalSplitInstance, ConditionalSplitComponent)
'Reinitialized metadata, refresh
Refresh(ConditionalSplitInstance)
Merge2Condition = DataFlowTask.PathCollection.[New]
Merge2Condition.AttachPathAndPropagateNotifications(MergeJoinComponent.OutputCollection(0), ConditionalSplitComponent.InputCollection(0))
OutPutConditional = ConditionalSplitInstance.InsertOutput(DTSInsertPlacement.IP_BEFORE, ConditionalSplitComponent.OutputCollection(0).ID)
OutPutConditional.Name = "Inserts"
OutPutConditional.Description = "Inserts New Records"
InputConditional = ConditionalSplitComponent.InputCollection(0)
ConditionalVirtualInput = InputConditional.GetVirtualInput()
CoditionalVirtualInputColumns = ConditionalVirtualInput.VirtualInputColumnCollection
For Each vColumn As IDTSVirtualInputColumn90 In ConditionalVirtualInput.VirtualInputColumnCollection
If vColumn.Name = "CheckId" Then
Dim col As IDTSInputColumn90 = ConditionalSplitInstance.SetUsageType(InputConditional.ID, ConditionalVirtualInput, vColumn.LineageID, DTSUsageType.UT_READONLY)
ConditionalSplitInstance.SetOutputProperty(OutPutConditional.ID, "FriendlyExpression", ConditionalExpression)
End If
Next
'Add Row Count Component
AddComponent(RowCountComponent, "InsertRowCount", "DTSTransform.RowCount.1", DataFlowTask)
Instantiate(RowCountInstance, RowCountComponent)
RowCountInstance.SetComponentProperty("VariableName", "User::int_InsertRowCount")
Refresh(RowCountInstance)
Condition2RowCount = DataFlowTask.PathCollection.[New]
Condition2RowCount.AttachPathAndPropagateNotifications(OutPutConditional, RowCountComponent.InputCollection(0))
'Add Destination Component
AddComponent(DestinationComponent, "Destination", "DTSAdapter.OLEDBDestination.1", DataFlowTask)
Instantiate(DestinationInstance, DestinationComponent)
DestinationComponent.Name = "Destination"
DestinationComponent.Description = "Destination Table"
DestinationComponent.ValidateExternalMetadata = True
DestinationComponent.RuntimeConnectionCollection(0).ConnectionManagerID = DestinationConnMgr.ID
DestinationComponent.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.ToConnectionManager90(DestinationConnMgr)
'Set AccessMode
DestinationInstance.SetComponentProperty("AccessMode", 3)
DestinationInstance.SetComponentProperty("OpenRowset", DestinationTable)
DestinationInstance.SetComponentProperty("FastLoadKeepIdentity", True)
DestinationInstance.SetComponentProperty("FastLoadMaxInsertCommitSize", FastLoadMaxCommitSize)
DestinationInstance.SetComponentProperty("FastLoadKeepNulls", False)
DestinationInstance.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = " + RowsPerBatch)
DestinationInstance.Validate()
Refresh(DestinationInstance)
'Connect Destination and RowCount
RowCount2Destination = DataFlowTask.PathCollection.[New]
RowCount2Destination.AttachPathAndPropagateNotifications(RowCountComponent.OutputCollection(0), DestinationComponent.InputCollection(0))
'Mappings
InputDestination = DestinationComponent.InputCollection(0)
DestinationVirtualInput = InputDestination.GetVirtualInput
DestinationVirtualInputColumns = CType(DestinationVirtualInput.VirtualInputColumnCollection, IDTSVirtualInputColumnCollection90)
'Mapping
For Each vcolumn As IDTSVirtualInputColumn90 In DestinationVirtualInput.VirtualInputColumnCollection
If vcolumn.Name = "CheckId" Then
Dim inputColumn As IDTSInputColumn90 = DestinationInstance.SetUsageType(InputDestination.ID, DestinationVirtualInput, vcolumn.LineageID, DTSUsageType.UT_IGNORED)
' Dim externalColumn As IDTSExternalMetadataColumn90 = InputDestination.ExternalMetadataColumnCollection(inputColumn.Name)
' DestinationInstance.MapInputColumn(InputDestination.ID, inputColumn.ID, externalColumn.ID)
Else
Dim inputColumn As IDTSInputColumn90 = DestinationInstance.SetUsageType(InputDestination.ID, DestinationVirtualInput, vcolumn.LineageID, DTSUsageType.UT_READONLY)
Dim externalColumn As IDTSExternalMetadataColumn90 = InputDestination.ExternalMetadataColumnCollection(inputColumn.Name)
DestinationInstance.MapInputColumn(InputDestination.ID, inputColumn.ID, externalColumn.ID)
End If
Next
'Save Package
Dim App As Microsoft.SqlServer.Dts.Runtime.Application = New Microsoft.SqlServer.Dts.Runtime.Application
App.SaveToXml(String.Format("C:\Temp\ssis_" + pkg.Name.ToString() + "_" + Format(Now(), "MMddyyyyss") + ".dtsx", pkg.Name), pkg, Nothing)
pkg.Execute()
Dts.TaskResult = Dts.Results.Success
Catch ex As Exception
'Save Package
Dim App As Microsoft.SqlServer.Dts.Runtime.Application = New Microsoft.SqlServer.Dts.Runtime.Application
App.SaveToXml(String.Format("C:\Temp\ssis_Error_" + pkg.Name.ToString() + "_" + Format(Now(), "MMddyyyyss") + ".dtsx", pkg.Name), pkg, Nothing)
Dts.TaskResult = Dts.Results.Failure
End Try
End Sub
Private Sub AddComponent(ByRef c As IDTSComponentMetaData90, ByVal Name As String, ByVal ComponentClassId As String, ByRef dft As MainPipe)
c = dft.ComponentMetaDataCollection.[New]
c.Name = Name
c.ComponentClassID = ComponentClassId
End Sub
Private Sub Instantiate(ByRef i As CManagedComponentWrapper, ByRef c As IDTSComponentMetaData90)
i = c.Instantiate
i.ProvideComponentProperties()
End Sub
Private Sub SetComponnetConnection(ByRef c As IDTSComponentMetaData90, ByVal CMgr As ConnectionManager)
c.RuntimeConnectionCollection(0).ConnectionManagerID = CMgr.ID
c.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.ToConnectionManager90(CMgr)
End Sub
Private Function CreateOLEDBConnection(ByVal p As Microsoft.SqlServer.Dts.Runtime.Package, _
ByVal ConName As String, ByVal ConStr As String) As ConnectionManager
Dim ConMgr As ConnectionManager = p.Connections.Add("OLEDB")
ConMgr.ConnectionString = ConStr
ConMgr.Name = ConName
ConMgr.Description = "SQL OLE DB Using " + ConName + " Connection"
Return ConMgr
End Function
Private Sub Refresh(ByRef i As CManagedComponentWrapper)
i.AcquireConnections(Nothing)
i.ReinitializeMetaData()
i.ReleaseConnections()
End Sub
End Class
nice code
ReplyDelete