May 07, 2012

Metadata Driving Package Generator

I have received several request on how to create an ssis generator using a metadata table. After some thoughts I decided to create a simple example instead of providing a total solution; is amazing what you can do in a few hours under the Quick and Dirty.
I wrote some code from scratch, but, after a little I said,wait a second
I already wrote code that can do that. So, after a quick lines and some major copy and paste and test here is the basic process of creating a dynamic package.



1.- Define your metadata table


Here is the Script

CREATE TABLE [dbo].[MetadataSample](
    [MetdataSampleId] [int] IDENTITY(1,1) NOT NULL,
    [ETLGroupId] [smallint] NOT NULL,
    [SourceServer] [varchar](80) NULL,
    [SourceDatabase] [varchar](80) NULL,
    [SourceSchema] [varchar](10) NULL,
    [SourceTable] [varchar](80) NULL,
    [SourceSqlCmd] [varchar](8000) NULL,
    [DestinationServer] [varchar](80) NULL,
    [DestinationDatabase] [varchar](80) NULL,
    [DestinationSchema] [varchar](10) NULL,
    [DestinationTable] [varchar](80) NULL,
    [IsFastLoad] [bit] NOT NULL,
    [IsActive] [bit] NOT NULL,
    [IsEnable] [bit] NOT NULL,
    [CreateDtTm] [datetime] NOT NULL,
    [CreatedBy] [varchar](50) NOT NULL,
    [ModifyDtTm] [datetime] NULL,
    [ModifiedBy] [varchar](50) NULL,
PRIMARY KEY CLUSTERED
(
    [MetdataSampleId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
UNIQUE NONCLUSTERED
(
    [SourceServer] ASC,
    [SourceDatabase] ASC,
    [SourceSchema] ASC,
    [SourceTable] ASC,
    [DestinationServer] ASC,
    [DestinationDatabase] ASC,
    [DestinationSchema] ASC,
    [DestinationTable] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

GO

SET ANSI_PADDING OFF
GO

ALTER TABLE [dbo].[MetadataSample] ADD  DEFAULT ((1)) FOR [IsFastLoad]
GO

ALTER TABLE [dbo].[MetadataSample] ADD  DEFAULT ((1)) FOR [IsActive]
GO

ALTER TABLE [dbo].[MetadataSample] ADD  DEFAULT ((1)) FOR [IsEnable]
GO

ALTER TABLE [dbo].[MetadataSample] ADD  DEFAULT (getutcdate()) FOR [CreateDtTm]
GO

ALTER TABLE [dbo].[MetadataSample] ADD  DEFAULT (suser_sname()) FOR [CreatedBy]
GO

ALTER TABLE [dbo].[MetadataSample] ADD  DEFAULT (getutcdate()) FOR [ModifyDtTm]
GO

ALTER TABLE [dbo].[MetadataSample] ADD  DEFAULT (suser_sname()) FOR [ModifiedBy]
GO


2.- Create an ssis package that will process the metadata table
2B.- Get Metadata
SELECT
       [SourceServer]
      ,[SourceDatabase]
      ,[SourceSchema]
      ,[SourceTable]
      ,[SourceSqlCmd]
      ,[DestinationServer]
      ,[DestinationDatabase]
      ,[DestinationSchema]
      ,[DestinationTable]
      ,[IsFastLoad]
      [MetdataSampleId]
  FROM [dbo].[MetadataSample]
  where IsActive = 1

A.) Define Variables:

B.) Create SSIS Package

C.) Write Code in SQL Script task
using System;
using System.Data;
using System.IO;
using System.Windows.Forms;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;

namespace ST_7eb66dcac2324fe8825dced0e2d537b7.csproj
{
    [System.AddIn.AddIn("ScriptMain", Version = "1.0", Publisher = "", Description = "")]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {

        #region VSTA generated code
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion
        Microsoft.SqlServer.Dts.Runtime.Package myPackage = new Microsoft.SqlServer.Dts.Runtime.Package();

        public void Main()
        {

            // Prepared Variables to recieved Metadata
            //read
            Variables vars = null;
            Dts.VariableDispenser.LockForRead("User::bol_IsFastLoad");
            Dts.VariableDispenser.LockForRead("User::bol_IsSqlCmd");
            Dts.VariableDispenser.LockForRead("User::str_SourceServer");
            Dts.VariableDispenser.LockForRead("User::str_SourceDatabase");
            Dts.VariableDispenser.LockForRead("User::str_SourceSchema");
            Dts.VariableDispenser.LockForRead("User::str_SourceTable");
            Dts.VariableDispenser.LockForRead("User::str_SourceSqlCmd");
            Dts.VariableDispenser.LockForRead("User::str_DestinationServer");
            Dts.VariableDispenser.LockForRead("User::str_DestinationDatabase");
            Dts.VariableDispenser.LockForRead("User::str_DestinationSchema");
            Dts.VariableDispenser.LockForRead("User::str_DestinationTable");
            Dts.VariableDispenser.LockForRead("User::str_SavePackagePath");
            //write
            Dts.VariableDispenser.LockForRead("User::int_SourceCnt");

          
           
            Dts.VariableDispenser.GetVariables(ref vars);
          

            // Set Variables
           
            String SourceServer = vars["User::str_SourceServer"].Value.ToString();
            String SourceDatabase = vars["str_SourceDatabase"].Value.ToString();
            String SourceSchema = vars["User::str_SourceSchema"].Value.ToString();
            String SourceTable = vars["User::str_SourceTable"].Value.ToString();
            String SourceSqlCmd = vars["User::str_SourceSqlCmd"].Value.ToString();

            String DestinationServer = vars["User::str_DestinationServer"].Value.ToString();
            String DestinationDatabase = vars["User::str_DestinationDatabase"].Value.ToString();
            String DestinationSchema = vars["User::str_DestinationSchema"].Value.ToString();
            String DestinationTable = vars["User::str_DestinationTable"].Value.ToString();
            String FilePath = vars["User::str_SavePackagePath"].Value.ToString();

            Boolean IsFastLoad = (Boolean)vars["User::bol_IsFastLoad"].Value;
            Boolean IsSqlCmd = (Boolean)vars["User::bol_IsSqlCmd"].Value;
            int SourceCnt = (int)vars["User::int_SourceCnt"].Value;
            myPackage.Name = "SSISFrom" + SourceTable + "2" + DestinationTable;

            //Create Connections ( Source Destination )

            String SourceConnStr = "Data Source=" + SourceServer + ";Initial Catalog=" + SourceDatabase + ";Provider=SQLNCLI10.1;Integrated Security=SSPI;";
            String DestinationConnStr = "Data Source=" + DestinationServer + ";Initial Catalog=" + DestinationDatabase + ";Provider=SQLNCLI10.1;Integrated Security=SSPI;";

            //Connection Managers
            ConnectionManager ConMgrSource;
            ConnectionManager ConMgrDestination;
            ConMgrSource = myPackage.Connections.Add("OLEDB");
            ConMgrSource.Name = "source";
            ConMgrSource.ConnectionString = SourceConnStr;
            ConMgrDestination = myPackage.Connections.Add("OLEDB");
            ConMgrDestination.Name = "destination";
            ConMgrDestination.ConnectionString = DestinationConnStr;

            //Add Source
            //Add a Sequence Container
            Executable execSequence = AddSQLExecutable("STOCK:Sequence");
            Microsoft.SqlServer.Dts.Runtime.Sequence seqSequence = (Microsoft.SqlServer.Dts.Runtime.Sequence)execSequence;
            //add Properties
            seqSequence.Name = "SEQUENCE Demo";
            seqSequence.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.UseParentSetting;
            seqSequence.Description = "Metadata Sample Sequence";

            //Add Work Flow
            Executable execDataFlow = seqSequence.Executables.Add("STOCK:PipelineTask");
            //Wrapper
            Microsoft.SqlServer.Dts.Runtime.TaskHost thDataFlow = (Microsoft.SqlServer.Dts.Runtime.TaskHost)execDataFlow;
            thDataFlow.Name = "DataFlow Demo";
            //reference thHost pipe
            MainPipe dfDataFlow = (MainPipe)thDataFlow.InnerObject;

            //Add Source Component to package
            IDTSComponentMetaData100 icmdSource = AddComponentMetadata(dfDataFlow, "SourceComponet");
            icmdSource.ComponentClassID = "DTSAdapter.OleDbSource.2";

            IDTSComponentMetaData100 icmDemo2 = AddComponentMetadata(dfDataFlow, "DestinationComponent");
            icmDemo2.ComponentClassID = "DTSAdapter.OLEDBDestination.2";
            icmDemo2.Name = "Destination";
            icmDemo2.Description = "Destination";
          
            //Source Configuration
            ConnectionManager source = myPackage.Connections["source"];
            ConnectionManager destination = myPackage.Connections["destination"];
            CManagedComponentWrapper mcwSource = icmdSource.Instantiate();
            mcwSource.ProvideComponentProperties();
            icmdSource.RuntimeConnectionCollection[0].ConnectionManagerID = ConMgrSource.ID;
            icmdSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(ConMgrSource);

            if (IsSqlCmd == true)
            {
                mcwSource.SetComponentProperty("AccessMode", 2);
                mcwSource.SetComponentProperty("SqlCommand", SourceSqlCmd);
            }
            else
            {
                mcwSource.SetComponentProperty("AccessMode", 0); //TableOrView
                mcwSource.SetComponentProperty("OpenRowset", SourceSchema + "." + SourceTable);
            }
            RefreshMetadata(mcwSource);

            icmDemo2.ValidateExternalMetadata = true;
            CManagedComponentWrapper mcwDemo2 = icmDemo2.Instantiate();
            // tbldemo2
            mcwDemo2.ProvideComponentProperties();
            icmDemo2.RuntimeConnectionCollection[0].ConnectionManagerID = ConMgrDestination.ID;
            icmDemo2.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(ConMgrDestination);
            mcwDemo2.SetComponentProperty("OpenRowset", DestinationSchema + "." + DestinationTable);
            mcwDemo2.SetComponentProperty("AlwaysUseDefaultCodePage", false);
            if (IsFastLoad == true)
            {
                mcwDemo2.SetComponentProperty("AccessMode", 3);//FastLoad
            }
            else
            {
                mcwDemo2.SetComponentProperty("AccessMode", 0);//table view
            }
           
            mcwDemo2.SetComponentProperty("FastLoadKeepIdentity", true);
            mcwDemo2.SetComponentProperty("FastLoadKeepNulls", false);
            mcwDemo2.SetComponentProperty("FastLoadOptions", "TABLOCK, CHECK_CONSTRAINTS");
            mcwDemo2.SetComponentProperty("FastLoadMaxInsertCommitSize", 0);
            RefreshMetadata(mcwDemo2);
          

            //Connect Componnents
            IDTSPath100 Source2Destination = dfDataFlow.PathCollection.New();
            Source2Destination.AttachPathAndPropagateNotifications(icmdSource.OutputCollection[0], icmDemo2.InputCollection[0]);

            //Mappings
            MappColumns(icmDemo2, mcwDemo2);

            // Execute Package
           
            myPackage.Execute();
            if(myPackage.ExecutionResult == Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure || myPackage.ExecutionStatus == Microsoft.SqlServer.Dts.Runtime.DTSExecStatus.Abend)
            {
                myPackage.Name = myPackage.Name + "_FailPackage";
                SavePackage(FilePath);
            }

            vars.Unlock();
            Dts.TaskResult = (int)ScriptResults.Success;
        }
        private void MappColumns(IDTSComponentMetaData100 icmDest, CManagedComponentWrapper mcwDest)
        {
            //mappings

            IDTSInput100 inpDestination = icmDest.InputCollection[0];
            IDTSVirtualInput100 vinpDestination = inpDestination.GetVirtualInput();
            IDTSVirtualInputColumnCollection100 vinpcDestination = (IDTSVirtualInputColumnCollection100)vinpDestination.VirtualInputColumnCollection;


            foreach (IDTSVirtualInputColumn100 vcolumn in vinpDestination.VirtualInputColumnCollection)
            {
                try
                {
                    IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, DTSUsageType.UT_READONLY);
                    // map external column
                    IDTSExternalMetadataColumn100 extColumn = inpDestination.ExternalMetadataColumnCollection[inputColumn.Name];
                    mcwDest.MapInputColumn(inpDestination.ID, inputColumn.ID, extColumn.ID);
                }
                catch
                {
                    IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, DTSUsageType.UT_IGNORED);
                    // map external column

                }

            }


        }
        private IDTSComponentMetaData100 AddComponentMetadata(MainPipe wf, String ComponentName)
        {
            IDTSComponentMetaData100 icmd = wf.ComponentMetaDataCollection.New();
            return icmd;
        }


        private void SavePackage(String DirectoryPath)
        {
            String FullFilePath = DirectoryPath + "\\" + myPackage.Name.ToString() + ".dtsx";
            if (File.Exists(FullFilePath))
            {

                File.Delete(FullFilePath);
                Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
                app.SaveToXml(FullFilePath, myPackage, null);

            }
            else
            {
                Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
                app.SaveToXml(FullFilePath, myPackage, null);

            }
        }

        private void RefreshMetadata(CManagedComponentWrapper mcw)
        {
            //Reinitialize the metadata, Refresh Columns
            mcw.AcquireConnections(null);
            mcw.ReinitializeMetaData();
            mcw.ReleaseConnections();
            mcw.Validate();

        }


        private Executable AddSQLExecutable(String MONIKER)
        {
            Executable exec = myPackage.Executables.Add(MONIKER);
            return exec;
        }
    }
}

That's all there is. There are other examples on how to save the package in this case I just executed it.
basically we just create a simple data flow with a source and destination components.
Happy Coding.




No comments:

Post a Comment

Contact Form

Name

Email *

Message *