I have received several request on how to create an ssis generator using a metadata table. After some thoughts I decided to create a simple example instead of providing a total solution; is amazing what you can do in a few hours under the Quick and Dirty.
I wrote some code from scratch, but, after a little I said,wait a second
I already wrote code that can do that. So, after a quick lines and some major copy and paste and test here is the basic process of creating a dynamic package.
1.- Define your metadata table
Here is the Script
CREATE TABLE [dbo].[MetadataSample](
[MetdataSampleId] [int] IDENTITY(1,1) NOT NULL,
[ETLGroupId] [smallint] NOT NULL,
[SourceServer] [varchar](80) NULL,
[SourceDatabase] [varchar](80) NULL,
[SourceSchema] [varchar](10) NULL,
[SourceTable] [varchar](80) NULL,
[SourceSqlCmd] [varchar](8000) NULL,
[DestinationServer] [varchar](80) NULL,
[DestinationDatabase] [varchar](80) NULL,
[DestinationSchema] [varchar](10) NULL,
[DestinationTable] [varchar](80) NULL,
[IsFastLoad] [bit] NOT NULL,
[IsActive] [bit] NOT NULL,
[IsEnable] [bit] NOT NULL,
[CreateDtTm] [datetime] NOT NULL,
[CreatedBy] [varchar](50) NOT NULL,
[ModifyDtTm] [datetime] NULL,
[ModifiedBy] [varchar](50) NULL,
PRIMARY KEY CLUSTERED
(
[MetdataSampleId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
UNIQUE NONCLUSTERED
(
[SourceServer] ASC,
[SourceDatabase] ASC,
[SourceSchema] ASC,
[SourceTable] ASC,
[DestinationServer] ASC,
[DestinationDatabase] ASC,
[DestinationSchema] ASC,
[DestinationTable] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO
ALTER TABLE [dbo].[MetadataSample] ADD DEFAULT ((1)) FOR [IsFastLoad]
GO
ALTER TABLE [dbo].[MetadataSample] ADD DEFAULT ((1)) FOR [IsActive]
GO
ALTER TABLE [dbo].[MetadataSample] ADD DEFAULT ((1)) FOR [IsEnable]
GO
ALTER TABLE [dbo].[MetadataSample] ADD DEFAULT (getutcdate()) FOR [CreateDtTm]
GO
ALTER TABLE [dbo].[MetadataSample] ADD DEFAULT (suser_sname()) FOR [CreatedBy]
GO
ALTER TABLE [dbo].[MetadataSample] ADD DEFAULT (getutcdate()) FOR [ModifyDtTm]
GO
ALTER TABLE [dbo].[MetadataSample] ADD DEFAULT (suser_sname()) FOR [ModifiedBy]
GO
2.- Create an ssis package that will process the metadata table
2B.- Get Metadata
SELECT
[SourceServer]
,[SourceDatabase]
,[SourceSchema]
,[SourceTable]
,[SourceSqlCmd]
,[DestinationServer]
,[DestinationDatabase]
,[DestinationSchema]
,[DestinationTable]
,[IsFastLoad]
[MetdataSampleId]
FROM [dbo].[MetadataSample]
where IsActive = 1
A.) Define Variables:
B.) Create SSIS Package
C.) Write Code in SQL Script task
using System;
using System.Data;
using System.IO;
using System.Windows.Forms;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace ST_7eb66dcac2324fe8825dced0e2d537b7.csproj
{
[System.AddIn.AddIn("ScriptMain", Version = "1.0", Publisher = "", Description = "")]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
#region VSTA generated code
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
Microsoft.SqlServer.Dts.Runtime.Package myPackage = new Microsoft.SqlServer.Dts.Runtime.Package();
public void Main()
{
// Prepared Variables to recieved Metadata
//read
Variables vars = null;
Dts.VariableDispenser.LockForRead("User::bol_IsFastLoad");
Dts.VariableDispenser.LockForRead("User::bol_IsSqlCmd");
Dts.VariableDispenser.LockForRead("User::str_SourceServer");
Dts.VariableDispenser.LockForRead("User::str_SourceDatabase");
Dts.VariableDispenser.LockForRead("User::str_SourceSchema");
Dts.VariableDispenser.LockForRead("User::str_SourceTable");
Dts.VariableDispenser.LockForRead("User::str_SourceSqlCmd");
Dts.VariableDispenser.LockForRead("User::str_DestinationServer");
Dts.VariableDispenser.LockForRead("User::str_DestinationDatabase");
Dts.VariableDispenser.LockForRead("User::str_DestinationSchema");
Dts.VariableDispenser.LockForRead("User::str_DestinationTable");
Dts.VariableDispenser.LockForRead("User::str_SavePackagePath");
//write
Dts.VariableDispenser.LockForRead("User::int_SourceCnt");
Dts.VariableDispenser.GetVariables(ref vars);
// Set Variables
String SourceServer = vars["User::str_SourceServer"].Value.ToString();
String SourceDatabase = vars["str_SourceDatabase"].Value.ToString();
String SourceSchema = vars["User::str_SourceSchema"].Value.ToString();
String SourceTable = vars["User::str_SourceTable"].Value.ToString();
String SourceSqlCmd = vars["User::str_SourceSqlCmd"].Value.ToString();
String DestinationServer = vars["User::str_DestinationServer"].Value.ToString();
String DestinationDatabase = vars["User::str_DestinationDatabase"].Value.ToString();
String DestinationSchema = vars["User::str_DestinationSchema"].Value.ToString();
String DestinationTable = vars["User::str_DestinationTable"].Value.ToString();
String FilePath = vars["User::str_SavePackagePath"].Value.ToString();
Boolean IsFastLoad = (Boolean)vars["User::bol_IsFastLoad"].Value;
Boolean IsSqlCmd = (Boolean)vars["User::bol_IsSqlCmd"].Value;
int SourceCnt = (int)vars["User::int_SourceCnt"].Value;
myPackage.Name = "SSISFrom" + SourceTable + "2" + DestinationTable;
//Create Connections ( Source Destination )
String SourceConnStr = "Data Source=" + SourceServer + ";Initial Catalog=" + SourceDatabase + ";Provider=SQLNCLI10.1;Integrated Security=SSPI;";
String DestinationConnStr = "Data Source=" + DestinationServer + ";Initial Catalog=" + DestinationDatabase + ";Provider=SQLNCLI10.1;Integrated Security=SSPI;";
//Connection Managers
ConnectionManager ConMgrSource;
ConnectionManager ConMgrDestination;
ConMgrSource = myPackage.Connections.Add("OLEDB");
ConMgrSource.Name = "source";
ConMgrSource.ConnectionString = SourceConnStr;
ConMgrDestination = myPackage.Connections.Add("OLEDB");
ConMgrDestination.Name = "destination";
ConMgrDestination.ConnectionString = DestinationConnStr;
//Add Source
//Add a Sequence Container
Executable execSequence = AddSQLExecutable("STOCK:Sequence");
Microsoft.SqlServer.Dts.Runtime.Sequence seqSequence = (Microsoft.SqlServer.Dts.Runtime.Sequence)execSequence;
//add Properties
seqSequence.Name = "SEQUENCE Demo";
seqSequence.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.UseParentSetting;
seqSequence.Description = "Metadata Sample Sequence";
//Add Work Flow
Executable execDataFlow = seqSequence.Executables.Add("STOCK:PipelineTask");
//Wrapper
Microsoft.SqlServer.Dts.Runtime.TaskHost thDataFlow = (Microsoft.SqlServer.Dts.Runtime.TaskHost)execDataFlow;
thDataFlow.Name = "DataFlow Demo";
//reference thHost pipe
MainPipe dfDataFlow = (MainPipe)thDataFlow.InnerObject;
//Add Source Component to package
IDTSComponentMetaData100 icmdSource = AddComponentMetadata(dfDataFlow, "SourceComponet");
icmdSource.ComponentClassID = "DTSAdapter.OleDbSource.2";
IDTSComponentMetaData100 icmDemo2 = AddComponentMetadata(dfDataFlow, "DestinationComponent");
icmDemo2.ComponentClassID = "DTSAdapter.OLEDBDestination.2";
icmDemo2.Name = "Destination";
icmDemo2.Description = "Destination";
//Source Configuration
ConnectionManager source = myPackage.Connections["source"];
ConnectionManager destination = myPackage.Connections["destination"];
CManagedComponentWrapper mcwSource = icmdSource.Instantiate();
mcwSource.ProvideComponentProperties();
icmdSource.RuntimeConnectionCollection[0].ConnectionManagerID = ConMgrSource.ID;
icmdSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(ConMgrSource);
if (IsSqlCmd == true)
{
mcwSource.SetComponentProperty("AccessMode", 2);
mcwSource.SetComponentProperty("SqlCommand", SourceSqlCmd);
}
else
{
mcwSource.SetComponentProperty("AccessMode", 0); //TableOrView
mcwSource.SetComponentProperty("OpenRowset", SourceSchema + "." + SourceTable);
}
RefreshMetadata(mcwSource);
icmDemo2.ValidateExternalMetadata = true;
CManagedComponentWrapper mcwDemo2 = icmDemo2.Instantiate();
// tbldemo2
mcwDemo2.ProvideComponentProperties();
icmDemo2.RuntimeConnectionCollection[0].ConnectionManagerID = ConMgrDestination.ID;
icmDemo2.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(ConMgrDestination);
mcwDemo2.SetComponentProperty("OpenRowset", DestinationSchema + "." + DestinationTable);
mcwDemo2.SetComponentProperty("AlwaysUseDefaultCodePage", false);
if (IsFastLoad == true)
{
mcwDemo2.SetComponentProperty("AccessMode", 3);//FastLoad
}
else
{
mcwDemo2.SetComponentProperty("AccessMode", 0);//table view
}
mcwDemo2.SetComponentProperty("FastLoadKeepIdentity", true);
mcwDemo2.SetComponentProperty("FastLoadKeepNulls", false);
mcwDemo2.SetComponentProperty("FastLoadOptions", "TABLOCK, CHECK_CONSTRAINTS");
mcwDemo2.SetComponentProperty("FastLoadMaxInsertCommitSize", 0);
RefreshMetadata(mcwDemo2);
//Connect Componnents
IDTSPath100 Source2Destination = dfDataFlow.PathCollection.New();
Source2Destination.AttachPathAndPropagateNotifications(icmdSource.OutputCollection[0], icmDemo2.InputCollection[0]);
//Mappings
MappColumns(icmDemo2, mcwDemo2);
// Execute Package
myPackage.Execute();
if(myPackage.ExecutionResult == Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure || myPackage.ExecutionStatus == Microsoft.SqlServer.Dts.Runtime.DTSExecStatus.Abend)
{
myPackage.Name = myPackage.Name + "_FailPackage";
SavePackage(FilePath);
}
vars.Unlock();
Dts.TaskResult = (int)ScriptResults.Success;
}
private void MappColumns(IDTSComponentMetaData100 icmDest, CManagedComponentWrapper mcwDest)
{
//mappings
IDTSInput100 inpDestination = icmDest.InputCollection[0];
IDTSVirtualInput100 vinpDestination = inpDestination.GetVirtualInput();
IDTSVirtualInputColumnCollection100 vinpcDestination = (IDTSVirtualInputColumnCollection100)vinpDestination.VirtualInputColumnCollection;
foreach (IDTSVirtualInputColumn100 vcolumn in vinpDestination.VirtualInputColumnCollection)
{
try
{
IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, DTSUsageType.UT_READONLY);
// map external column
IDTSExternalMetadataColumn100 extColumn = inpDestination.ExternalMetadataColumnCollection[inputColumn.Name];
mcwDest.MapInputColumn(inpDestination.ID, inputColumn.ID, extColumn.ID);
}
catch
{
IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, DTSUsageType.UT_IGNORED);
// map external column
}
}
}
private IDTSComponentMetaData100 AddComponentMetadata(MainPipe wf, String ComponentName)
{
IDTSComponentMetaData100 icmd = wf.ComponentMetaDataCollection.New();
return icmd;
}
private void SavePackage(String DirectoryPath)
{
String FullFilePath = DirectoryPath + "\\" + myPackage.Name.ToString() + ".dtsx";
if (File.Exists(FullFilePath))
{
File.Delete(FullFilePath);
Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
app.SaveToXml(FullFilePath, myPackage, null);
}
else
{
Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
app.SaveToXml(FullFilePath, myPackage, null);
}
}
private void RefreshMetadata(CManagedComponentWrapper mcw)
{
//Reinitialize the metadata, Refresh Columns
mcw.AcquireConnections(null);
mcw.ReinitializeMetaData();
mcw.ReleaseConnections();
mcw.Validate();
}
private Executable AddSQLExecutable(String MONIKER)
{
Executable exec = myPackage.Executables.Add(MONIKER);
return exec;
}
}
}
That's all there is. There are other examples on how to save the package in this case I just executed it.
basically we just create a simple data flow with a source and destination components.
Happy Coding.
No comments:
Post a Comment