I decided that instead of keep adding one piece of code at the time just add the final class used in the SQLSaturday#130 Schedule demonstration. The Final class is nothing more than a SQLTask, DataFlow with a row count and multicast:The Generated package is for SQL Sever 200,
I have received several request on how to used the mapping from source to destination and what better example than this. Hopefully this will answer some of the questions and like always, everything is limited to your potential. I have added a couple of pictures on how the package will look like:/*******************************************************************
*
* Jorge Novo
* SQLSaturday SSIS Demostration
* jorge.novo@gmail.com
* http://etldevelopernotes.blogspot.com/
* 4/27/2012
*
* Package to demostrate an empty ssis package
* Add Connection Strings and loggings
* Add SQL Task and Sequence Container
* Add Data Flow
* Add Source
* Add Destination
* add multicast
* add rowcount
* add eventhandle to the sequence
*
* Class 7- 7
*
*
*
* *****************************************************************/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Tasks.ExecuteSQLTask;
namespace SQLSaturday130
{
public class Demo7 : IDisposable
{
private Microsoft.SqlServer.Dts.Runtime.Package myPackage = new Microsoft.SqlServer.Dts.Runtime.Package();
public void CreatePackage()
{
String SqlCmd = "Select count (*) from information_schema.tables";
String Description = "SQLSaturday Demo";
String SourceSchTable = "tblDemo1";
String tblDemo2 = "tblDemo2";
String tblDemo3 = "tblDemo3";
String tblDemo4 = "tblDemo4";
String RowsPerBatch = "10000";
int FastLoadMaxCommitSize = 50000;
//Add Variable to package
Microsoft.SqlServer.Dts.Runtime.Variable mvar = myPackage.Variables.Add("iDestCnt", false, "User", 0);
Microsoft.SqlServer.Dts.Runtime.Variable mrow = myPackage.Variables.Add("iRowCnt", false, "User", 0);
Microsoft.SqlServer.Dts.Runtime.Variable mevent = myPackage.Variables.Add("iError", false, "User", 0);
//Add and get executable reference to a SQLTAsk Component
Executable execSqlTask = AddSQLExecutable("STOCK:SQLTASk");
//Wrap the executable in a TaskHost
AddSQLTask(execSqlTask, "SQLTASK DEMO", "destination", SqlCmd, mvar.Name);
//Add a Sequence Container
Executable execSequence = AddSQLExecutable("STOCK:Sequence");
Microsoft.SqlServer.Dts.Runtime.Sequence seqSequence = (Microsoft.SqlServer.Dts.Runtime.Sequence)execSequence;
//add Properties
seqSequence.Name = "SEQUENCE Demo";
seqSequence.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.UseParentSetting;
seqSequence.Description = Description;
//Set precedence Constraint
Microsoft.SqlServer.Dts.Runtime.PrecedenceConstraint SqlTask2Sequece = myPackage.PrecedenceConstraints.Add(execSqlTask, execSequence);
SqlTask2Sequece.Value = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success;
//Add Empty Work Flow
//Add Work Flow
Executable execDataFlow = seqSequence.Executables.Add("STOCK:PipelineTask");
//Wrapper
Microsoft.SqlServer.Dts.Runtime.TaskHost thDataFlow = (Microsoft.SqlServer.Dts.Runtime.TaskHost)execDataFlow;
thDataFlow.Name = "DataFlow Demo";
//reference thHost pipe
MainPipe dfDataFlow = (MainPipe)thDataFlow.InnerObject;
//Add Source Component to package
IDTSComponentMetaData100 icmdSource = AddComponentMetadata(dfDataFlow, "SourceComponet");
icmdSource.ComponentClassID = "DTSAdapter.OleDbSource.2";
//Add Destinations components
IDTSComponentMetaData100 icmDemo2 = AddComponentMetadata(dfDataFlow, "tblDemo2");
icmDemo2.ComponentClassID = "DTSAdapter.OLEDBDestination.2";
IDTSComponentMetaData100 icmDemo3 = AddComponentMetadata(dfDataFlow, "tblDemo3");
icmDemo3.ComponentClassID = "DTSAdapter.OLEDBDestination.2";
IDTSComponentMetaData100 icmDemo4 = AddComponentMetadata(dfDataFlow, "tblDemo4");
icmDemo4.ComponentClassID = "DTSAdapter.OLEDBDestination.2";
//Source Configuration
ConnectionManager source = myPackage.Connections["source"];
ConnectionManager destination = myPackage.Connections["destination"];
CManagedComponentWrapper mcwSource = icmdSource.Instantiate();
mcwSource.ProvideComponentProperties();
icmdSource.RuntimeConnectionCollection[0].ConnectionManagerID = source.ID;
icmdSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(source);
//mcwSource.SetComponentProperty("AccessMode",2)//SQLCmd
//mcwSource.SetComponentProperty("SqlCommand",SqlCmd);
mcwSource.SetComponentProperty("AccessMode", 0); //TableOrView
mcwSource.SetComponentProperty("OpenRowset", SourceSchTable);
RefreshMetadata(mcwSource);
//Destination Configuration
icmDemo2.ValidateExternalMetadata = true;
icmDemo3.ValidateExternalMetadata = true;
icmDemo4.ValidateExternalMetadata = true;
CManagedComponentWrapper mcwDemo2 = icmDemo2.Instantiate();
CManagedComponentWrapper mcwDemo3 = icmDemo3.Instantiate();
CManagedComponentWrapper mcwDemo4 = icmDemo4.Instantiate();
// tbldemo2
mcwDemo2.ProvideComponentProperties();
icmDemo2.Name = "tblDemo2";
icmDemo2.Description = "tblDemo2";
icmDemo2.RuntimeConnectionCollection[0].ConnectionManagerID = destination.ID;
icmDemo2.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(destination);
mcwDemo2.SetComponentProperty("AccessMode", 0);//FastLoad
mcwDemo2.SetComponentProperty("OpenRowset", tblDemo2);
mcwDemo2.Validate();
mcwDemo2.SetComponentProperty("FastLoadKeepIdentity", true);
mcwDemo2.SetComponentProperty("FastLoadMaxInsertCommitSize", FastLoadMaxCommitSize);
mcwDemo2.SetComponentProperty("FastLoadKeepNulls", false);
mcwDemo2.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = " + RowsPerBatch);
RefreshMetadata(mcwDemo2);
// tbldemo3
mcwDemo3.ProvideComponentProperties();
icmDemo3.Name = "tblDemo3";
icmDemo3.Description = "tblDemo3";
icmDemo3.RuntimeConnectionCollection[0].ConnectionManagerID = destination.ID;
icmDemo3.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(destination);
mcwDemo3.SetComponentProperty("AccessMode", 0);//FastLoad
mcwDemo3.SetComponentProperty("OpenRowset", tblDemo3);
mcwDemo3.Validate();
mcwDemo3.SetComponentProperty("FastLoadKeepIdentity", true);
mcwDemo3.SetComponentProperty("FastLoadMaxInsertCommitSize", FastLoadMaxCommitSize);
mcwDemo3.SetComponentProperty("FastLoadKeepNulls", false);
mcwDemo3.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = " + RowsPerBatch);
RefreshMetadata(mcwDemo3);
//demo4
mcwDemo4.ProvideComponentProperties();
icmDemo4.Name = "tblDemo4";
icmDemo4.Description = "tblDemo4";
icmDemo4.RuntimeConnectionCollection[0].ConnectionManagerID = destination.ID;
icmDemo4.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(destination);
mcwDemo4.SetComponentProperty("AccessMode", 0);//FastLoad
mcwDemo4.SetComponentProperty("OpenRowset", tblDemo4);
mcwDemo4.Validate();
mcwDemo4.SetComponentProperty("FastLoadKeepIdentity", true);
mcwDemo4.SetComponentProperty("FastLoadMaxInsertCommitSize", FastLoadMaxCommitSize);
mcwDemo4.SetComponentProperty("FastLoadKeepNulls", false);
mcwDemo4.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = " + RowsPerBatch);
RefreshMetadata(mcwDemo4);
//Add RowCount
IDTSComponentMetaData100 icmdRowCount = AddComponentMetadata(dfDataFlow, "RowCountComponent");
icmdRowCount.ComponentClassID = "DTSTransform.RowCount.2";
CManagedComponentWrapper mcwRowCount = icmdRowCount.Instantiate();
mcwRowCount.ProvideComponentProperties();
mcwRowCount.SetComponentProperty("VariableName", "User::iRowCnt");
// Add multiCast
IDTSComponentMetaData100 icmMulticast = AddComponentMetadata(dfDataFlow, "MulticastComponent");
icmMulticast.ComponentClassID = "DTSTransform.Multicast.2";
CManagedComponentWrapper mcwMulticast = icmMulticast.Instantiate();
mcwMulticast.ProvideComponentProperties();
icmMulticast.Name = "Multicast Demo";
//join RowCnt and Source
IDTSPath100 iphSource2RowCnt = dfDataFlow.PathCollection.New();
iphSource2RowCnt.AttachPathAndPropagateNotifications(icmdSource.OutputCollection[0], icmdRowCount.InputCollection[0]);
// join RowCnt and multicast
IDTSPath100 iphRowCnt2Multicast = dfDataFlow.PathCollection.New();
iphRowCnt2Multicast.AttachPathAndPropagateNotifications(icmdRowCount.OutputCollection[0], icmMulticast.InputCollection[0]);
//Join Multicast[0]
IDTSPath100 iphMulticast02Demo2 = dfDataFlow.PathCollection.New();
iphMulticast02Demo2.AttachPathAndPropagateNotifications(icmMulticast.OutputCollection[0], icmDemo4.InputCollection[0]);
//join multicast[1]
IDTSPath100 iphMulticast02Demo3 = dfDataFlow.PathCollection.New();
iphMulticast02Demo3.AttachPathAndPropagateNotifications(icmMulticast.OutputCollection[1], icmDemo2.InputCollection[0]);
// join multicat[2]
IDTSPath100 iphMulticast02Demo4 = dfDataFlow.PathCollection.New();
iphMulticast02Demo4.AttachPathAndPropagateNotifications(icmMulticast.OutputCollection[2], icmDemo3.InputCollection[0]);
// map columns
MappColumns(icmDemo4, mcwDemo4);
MappColumns(icmDemo2, mcwDemo2);
MappColumns(icmDemo3, mcwDemo3);
// Add Event Handler
DtsEventHandler ehOnError = (DtsEventHandler)seqSequence.EventHandlers.Add("OnError");
Executable execEvent = ehOnError.Executables.Add("STOCK:SQLTASK");
AddSQLTask(execEvent, "SQLTASKOnError DEMO", "destination", SqlCmd, mevent.Name);
String SaveLocation = @"c:\\temp";
SavePackage(SaveLocation);
}
public Microsoft.SqlServer.Dts.Runtime.Package GetSetPackage
{
get { return myPackage; }
set { myPackage = value; }
}
public String PackageName
{
set { myPackage.Name = value; }
}
public void AddLogging(String ConnectionName, Boolean Enable)
{
LogProvider pkLogging;
pkLogging = myPackage.LogProviders.Add("DTS.LogProviderSQLServer.2");
pkLogging.ConfigString = ConnectionName;
pkLogging.Description = "SQL Server Logging ";
myPackage.LoggingOptions.SelectedLogProviders.Add(pkLogging);
myPackage.LoggingOptions.EventFilterKind = Microsoft.SqlServer.Dts.Runtime.DTSEventFilterKind.Inclusion;
myPackage.LoggingOptions.EventFilter = new String[] { "OnPreExecute", "OnPostExecute", "OnError" };
switch (Enable)
{
case true:
myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Enabled;
break;
case false:
myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Disabled;
break;
default:
myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Disabled;
break;
}
}
public void AddOLEDBConnection(String ConnectionName, String ConnectionStr)
{
ConnectionManager ConMgr = myPackage.Connections.Add("OLEDB");
ConMgr.ConnectionString = ConnectionStr + "Packet Size=32076;";
ConMgr.Name = ConnectionName;
ConMgr.Description = "SQL OLEDB using " + ConnectionName;
}
private void MappColumns(IDTSComponentMetaData100 icmDest, CManagedComponentWrapper mcwDest)
{
//mappings
IDTSInput100 inpDestination = icmDest.InputCollection[0];
IDTSVirtualInput100 vinpDestination = inpDestination.GetVirtualInput();
IDTSVirtualInputColumnCollection100 vinpcDestination = (IDTSVirtualInputColumnCollection100)vinpDestination.VirtualInputColumnCollection;
foreach (IDTSVirtualInputColumn100 vcolumn in vinpDestination.VirtualInputColumnCollection)
{
try
{
IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, DTSUsageType.UT_READONLY);
// map external column
IDTSExternalMetadataColumn100 extColumn = inpDestination.ExternalMetadataColumnCollection[inputColumn.Name];
mcwDest.MapInputColumn(inpDestination.ID, inputColumn.ID, extColumn.ID);
}
catch
{
IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, DTSUsageType.UT_IGNORED);
// map external column
}
}
}
private IDTSComponentMetaData100 AddComponentMetadata(MainPipe wf, String ComponentName)
{
IDTSComponentMetaData100 icmd = wf.ComponentMetaDataCollection.New();
return icmd;
}
private void AddSQLTask(Executable exec, String TaskName, String Connection, String SqlCmd, String VariableName)
{
Microsoft.SqlServer.Dts.Runtime.TaskHost sqlTaskHost = (Microsoft.SqlServer.Dts.Runtime.TaskHost)exec;
//Add Properties
sqlTaskHost.Properties["Name"].SetValue(sqlTaskHost, TaskName);
sqlTaskHost.Properties["Description"].SetValue(sqlTaskHost, TaskName);
sqlTaskHost.Properties["Connection"].SetValue(sqlTaskHost, Connection);
sqlTaskHost.Properties["SqlStatementSource"].SetValue(sqlTaskHost, SqlCmd);
IDTSExecuteSQL iexecSqlTask = (IDTSExecuteSQL)sqlTaskHost.InnerObject;
iexecSqlTask.ResultSetType = ResultSetType.ResultSetType_SingleRow;
iexecSqlTask.SqlStatementSourceType = SqlStatementSourceType.DirectInput;
IDTSResultBinding isqlTaskResult = iexecSqlTask.ResultSetBindings.Add();
isqlTaskResult.DtsVariableName = VariableName;
}
private void SavePackage(String DirectoryPath)
{
String FullFilePath = DirectoryPath + "\\" + myPackage.Name.ToString() + ".dtsx";
if (File.Exists(FullFilePath))
{
File.Delete(FullFilePath);
Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
app.SaveToXml(FullFilePath, myPackage, null);
}
else
{
Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
app.SaveToXml(FullFilePath, myPackage, null);
}
}
private void RefreshMetadata(CManagedComponentWrapper mcw)
{
//Reinitialize the metadata, Refresh Columns
mcw.AcquireConnections(null);
mcw.ReinitializeMetaData();
mcw.ReleaseConnections();
}
private Executable AddSQLExecutable(String MONIKER)
{
Executable exec = myPackage.Executables.Add(MONIKER);
return exec;
}
protected void Dispose(bool disposing)
{
if (disposing)
{
// dispose managed resources
myPackage.Dispose();
}
// free native resources
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
}
No comments:
Post a Comment