May 04, 2012

SQLSaturday #130 Demo 7

I decided that instead of keep adding one piece of code at the time just add the final class used in the  SQLSaturday#130 Schedule demonstration. The Final class is nothing more than a SQLTask, DataFlow with a row count and multicast:The Generated package is for SQL Sever 200,
I have received several request on how to used the mapping from source to destination and what better example than this.  Hopefully this will answer some of the questions and like always, everything is limited to your potential. I have added a couple of pictures on how the package will look like:


/*******************************************************************
 *
 * Jorge Novo
 * SQLSaturday SSIS Demostration
 * jorge.novo@gmail.com
 * http://etldevelopernotes.blogspot.com/
 * 4/27/2012
 *
 * Package to demostrate an empty ssis package
 * Add Connection Strings and loggings
 * Add SQL Task and Sequence Container
 * Add Data Flow
 * Add Source
 * Add Destination
 * add multicast
 * add rowcount
 * add eventhandle to the sequence
 *
 * Class 7- 7

 *
 *
 *
 * *****************************************************************/















using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Tasks.ExecuteSQLTask;

namespace SQLSaturday130
{
    public class Demo7 : IDisposable
    {
        private Microsoft.SqlServer.Dts.Runtime.Package myPackage = new Microsoft.SqlServer.Dts.Runtime.Package();

        public void CreatePackage()
        {
            String SqlCmd = "Select count (*) from information_schema.tables";
            String Description = "SQLSaturday Demo";
            String SourceSchTable = "tblDemo1";
            String tblDemo2 = "tblDemo2";
            String tblDemo3 = "tblDemo3";
            String tblDemo4 = "tblDemo4";
            String RowsPerBatch = "10000";
            int FastLoadMaxCommitSize = 50000;
            //Add Variable to package
            Microsoft.SqlServer.Dts.Runtime.Variable mvar = myPackage.Variables.Add("iDestCnt", false, "User", 0);
            Microsoft.SqlServer.Dts.Runtime.Variable mrow = myPackage.Variables.Add("iRowCnt", false, "User", 0);
            Microsoft.SqlServer.Dts.Runtime.Variable mevent = myPackage.Variables.Add("iError", false, "User", 0);
            //Add and get executable reference to a SQLTAsk Component
            Executable execSqlTask = AddSQLExecutable("STOCK:SQLTASk");
            //Wrap the executable in a TaskHost
            AddSQLTask(execSqlTask, "SQLTASK DEMO", "destination", SqlCmd, mvar.Name);
            //Add a Sequence Container
            Executable execSequence = AddSQLExecutable("STOCK:Sequence");
            Microsoft.SqlServer.Dts.Runtime.Sequence seqSequence = (Microsoft.SqlServer.Dts.Runtime.Sequence)execSequence;
            //add Properties
            seqSequence.Name = "SEQUENCE Demo";
            seqSequence.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.UseParentSetting;
            seqSequence.Description = Description;

            //Set precedence Constraint
            Microsoft.SqlServer.Dts.Runtime.PrecedenceConstraint SqlTask2Sequece = myPackage.PrecedenceConstraints.Add(execSqlTask, execSequence);
            SqlTask2Sequece.Value = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success;
            //Add Empty Work Flow

            //Add Work Flow
            Executable execDataFlow = seqSequence.Executables.Add("STOCK:PipelineTask");
            //Wrapper
            Microsoft.SqlServer.Dts.Runtime.TaskHost thDataFlow = (Microsoft.SqlServer.Dts.Runtime.TaskHost)execDataFlow;
            thDataFlow.Name = "DataFlow Demo";
            //reference thHost pipe
            MainPipe dfDataFlow = (MainPipe)thDataFlow.InnerObject;

            //Add Source Component to package
            IDTSComponentMetaData100 icmdSource = AddComponentMetadata(dfDataFlow, "SourceComponet");
            icmdSource.ComponentClassID = "DTSAdapter.OleDbSource.2";

            //Add Destinations components
            IDTSComponentMetaData100 icmDemo2 = AddComponentMetadata(dfDataFlow, "tblDemo2");
            icmDemo2.ComponentClassID = "DTSAdapter.OLEDBDestination.2";
            IDTSComponentMetaData100 icmDemo3 = AddComponentMetadata(dfDataFlow, "tblDemo3");
            icmDemo3.ComponentClassID = "DTSAdapter.OLEDBDestination.2";
            IDTSComponentMetaData100 icmDemo4 = AddComponentMetadata(dfDataFlow, "tblDemo4");
            icmDemo4.ComponentClassID = "DTSAdapter.OLEDBDestination.2";


            //Source Configuration
            ConnectionManager source = myPackage.Connections["source"];
            ConnectionManager destination = myPackage.Connections["destination"];
            CManagedComponentWrapper mcwSource = icmdSource.Instantiate();
            mcwSource.ProvideComponentProperties();
            icmdSource.RuntimeConnectionCollection[0].ConnectionManagerID = source.ID;
            icmdSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(source);

            //mcwSource.SetComponentProperty("AccessMode",2)//SQLCmd
            //mcwSource.SetComponentProperty("SqlCommand",SqlCmd);
            mcwSource.SetComponentProperty("AccessMode", 0); //TableOrView
            mcwSource.SetComponentProperty("OpenRowset", SourceSchTable);
            RefreshMetadata(mcwSource);

            //Destination Configuration

            icmDemo2.ValidateExternalMetadata = true;
            icmDemo3.ValidateExternalMetadata = true;
            icmDemo4.ValidateExternalMetadata = true;
            CManagedComponentWrapper mcwDemo2 = icmDemo2.Instantiate();
            CManagedComponentWrapper mcwDemo3 = icmDemo3.Instantiate();
            CManagedComponentWrapper mcwDemo4 = icmDemo4.Instantiate();

            // tbldemo2
            mcwDemo2.ProvideComponentProperties();
            icmDemo2.Name = "tblDemo2";
            icmDemo2.Description = "tblDemo2";
            icmDemo2.RuntimeConnectionCollection[0].ConnectionManagerID = destination.ID;
            icmDemo2.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(destination);
            mcwDemo2.SetComponentProperty("AccessMode", 0);//FastLoad
            mcwDemo2.SetComponentProperty("OpenRowset", tblDemo2);
            mcwDemo2.Validate();
            mcwDemo2.SetComponentProperty("FastLoadKeepIdentity", true);
            mcwDemo2.SetComponentProperty("FastLoadMaxInsertCommitSize", FastLoadMaxCommitSize);
            mcwDemo2.SetComponentProperty("FastLoadKeepNulls", false);
            mcwDemo2.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = " + RowsPerBatch);
            RefreshMetadata(mcwDemo2);

            // tbldemo3
            mcwDemo3.ProvideComponentProperties();
            icmDemo3.Name = "tblDemo3";
            icmDemo3.Description = "tblDemo3";
            icmDemo3.RuntimeConnectionCollection[0].ConnectionManagerID = destination.ID;
            icmDemo3.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(destination);
            mcwDemo3.SetComponentProperty("AccessMode", 0);//FastLoad
            mcwDemo3.SetComponentProperty("OpenRowset", tblDemo3);
            mcwDemo3.Validate();
            mcwDemo3.SetComponentProperty("FastLoadKeepIdentity", true);
            mcwDemo3.SetComponentProperty("FastLoadMaxInsertCommitSize", FastLoadMaxCommitSize);
            mcwDemo3.SetComponentProperty("FastLoadKeepNulls", false);
            mcwDemo3.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = " + RowsPerBatch);
            RefreshMetadata(mcwDemo3);

            //demo4
            mcwDemo4.ProvideComponentProperties();
            icmDemo4.Name = "tblDemo4";
            icmDemo4.Description = "tblDemo4";
            icmDemo4.RuntimeConnectionCollection[0].ConnectionManagerID = destination.ID;
            icmDemo4.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(destination);
            mcwDemo4.SetComponentProperty("AccessMode", 0);//FastLoad
            mcwDemo4.SetComponentProperty("OpenRowset", tblDemo4);
            mcwDemo4.Validate();
            mcwDemo4.SetComponentProperty("FastLoadKeepIdentity", true);
            mcwDemo4.SetComponentProperty("FastLoadMaxInsertCommitSize", FastLoadMaxCommitSize);
            mcwDemo4.SetComponentProperty("FastLoadKeepNulls", false);
            mcwDemo4.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = " + RowsPerBatch);
            RefreshMetadata(mcwDemo4);

            //Add RowCount
            IDTSComponentMetaData100 icmdRowCount = AddComponentMetadata(dfDataFlow, "RowCountComponent");
            icmdRowCount.ComponentClassID = "DTSTransform.RowCount.2";
            CManagedComponentWrapper mcwRowCount = icmdRowCount.Instantiate();
            mcwRowCount.ProvideComponentProperties();
            mcwRowCount.SetComponentProperty("VariableName", "User::iRowCnt");

            // Add multiCast
            IDTSComponentMetaData100 icmMulticast = AddComponentMetadata(dfDataFlow, "MulticastComponent");
            icmMulticast.ComponentClassID = "DTSTransform.Multicast.2";
            CManagedComponentWrapper mcwMulticast = icmMulticast.Instantiate();
            mcwMulticast.ProvideComponentProperties();
            icmMulticast.Name = "Multicast Demo";


            //join RowCnt and Source
            IDTSPath100 iphSource2RowCnt = dfDataFlow.PathCollection.New();
            iphSource2RowCnt.AttachPathAndPropagateNotifications(icmdSource.OutputCollection[0], icmdRowCount.InputCollection[0]);
            // join RowCnt and multicast
            IDTSPath100 iphRowCnt2Multicast = dfDataFlow.PathCollection.New();
            iphRowCnt2Multicast.AttachPathAndPropagateNotifications(icmdRowCount.OutputCollection[0], icmMulticast.InputCollection[0]);
            //Join Multicast[0]
            IDTSPath100 iphMulticast02Demo2 = dfDataFlow.PathCollection.New();
            iphMulticast02Demo2.AttachPathAndPropagateNotifications(icmMulticast.OutputCollection[0], icmDemo4.InputCollection[0]);
            //join multicast[1]
            IDTSPath100 iphMulticast02Demo3 = dfDataFlow.PathCollection.New();
            iphMulticast02Demo3.AttachPathAndPropagateNotifications(icmMulticast.OutputCollection[1], icmDemo2.InputCollection[0]);
            // join multicat[2]
            IDTSPath100 iphMulticast02Demo4 = dfDataFlow.PathCollection.New();
            iphMulticast02Demo4.AttachPathAndPropagateNotifications(icmMulticast.OutputCollection[2], icmDemo3.InputCollection[0]);
            // map columns
            MappColumns(icmDemo4, mcwDemo4);
            MappColumns(icmDemo2, mcwDemo2);
            MappColumns(icmDemo3, mcwDemo3);

            // Add Event Handler

            DtsEventHandler ehOnError = (DtsEventHandler)seqSequence.EventHandlers.Add("OnError");
            Executable execEvent = ehOnError.Executables.Add("STOCK:SQLTASK");
            AddSQLTask(execEvent, "SQLTASKOnError DEMO", "destination", SqlCmd, mevent.Name);


            String SaveLocation = @"c:\\temp";
            SavePackage(SaveLocation);


        }
        public Microsoft.SqlServer.Dts.Runtime.Package GetSetPackage
        {
            get { return myPackage; }
            set { myPackage = value; }

        }
        public String PackageName
        {
            set { myPackage.Name = value; }
        }


        public void AddLogging(String ConnectionName, Boolean Enable)
        {
            LogProvider pkLogging;
            pkLogging = myPackage.LogProviders.Add("DTS.LogProviderSQLServer.2");
            pkLogging.ConfigString = ConnectionName;
            pkLogging.Description = "SQL Server Logging ";
            myPackage.LoggingOptions.SelectedLogProviders.Add(pkLogging);
            myPackage.LoggingOptions.EventFilterKind = Microsoft.SqlServer.Dts.Runtime.DTSEventFilterKind.Inclusion;
            myPackage.LoggingOptions.EventFilter = new String[] { "OnPreExecute", "OnPostExecute", "OnError" };
            switch (Enable)
            {
                case true:
                    myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Enabled;
                    break;
                case false:
                    myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Disabled;
                    break;
                default:
                    myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Disabled;
                    break;

            }
        }
        public void AddOLEDBConnection(String ConnectionName, String ConnectionStr)
        {
            ConnectionManager ConMgr = myPackage.Connections.Add("OLEDB");

            ConMgr.ConnectionString = ConnectionStr + "Packet Size=32076;";
            ConMgr.Name = ConnectionName;
            ConMgr.Description = "SQL OLEDB using " + ConnectionName;

        }
        private void MappColumns(IDTSComponentMetaData100 icmDest, CManagedComponentWrapper mcwDest)
        {
            //mappings

            IDTSInput100 inpDestination = icmDest.InputCollection[0];
            IDTSVirtualInput100 vinpDestination = inpDestination.GetVirtualInput();
            IDTSVirtualInputColumnCollection100 vinpcDestination = (IDTSVirtualInputColumnCollection100)vinpDestination.VirtualInputColumnCollection;


            foreach (IDTSVirtualInputColumn100 vcolumn in vinpDestination.VirtualInputColumnCollection)
            {
                try
                {
                    IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, DTSUsageType.UT_READONLY);
                    // map external column
                    IDTSExternalMetadataColumn100 extColumn = inpDestination.ExternalMetadataColumnCollection[inputColumn.Name];
                    mcwDest.MapInputColumn(inpDestination.ID, inputColumn.ID, extColumn.ID);
                }
                catch
                {
                    IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, DTSUsageType.UT_IGNORED);
                    // map external column

                }

            }


        }
        private IDTSComponentMetaData100 AddComponentMetadata(MainPipe wf, String ComponentName)
        {
            IDTSComponentMetaData100 icmd = wf.ComponentMetaDataCollection.New();
            return icmd;
        }

        private void AddSQLTask(Executable exec, String TaskName, String Connection, String SqlCmd, String VariableName)
        {
            Microsoft.SqlServer.Dts.Runtime.TaskHost sqlTaskHost = (Microsoft.SqlServer.Dts.Runtime.TaskHost)exec;
            //Add Properties
            sqlTaskHost.Properties["Name"].SetValue(sqlTaskHost, TaskName);
            sqlTaskHost.Properties["Description"].SetValue(sqlTaskHost, TaskName);
            sqlTaskHost.Properties["Connection"].SetValue(sqlTaskHost, Connection);
            sqlTaskHost.Properties["SqlStatementSource"].SetValue(sqlTaskHost, SqlCmd);
            IDTSExecuteSQL iexecSqlTask = (IDTSExecuteSQL)sqlTaskHost.InnerObject;
            iexecSqlTask.ResultSetType = ResultSetType.ResultSetType_SingleRow;
            iexecSqlTask.SqlStatementSourceType = SqlStatementSourceType.DirectInput;
            IDTSResultBinding isqlTaskResult = iexecSqlTask.ResultSetBindings.Add();
            isqlTaskResult.DtsVariableName = VariableName;
        }
        private void SavePackage(String DirectoryPath)
        {
            String FullFilePath = DirectoryPath + "\\" + myPackage.Name.ToString() + ".dtsx";
            if (File.Exists(FullFilePath))
            {

                File.Delete(FullFilePath);
                Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
                app.SaveToXml(FullFilePath, myPackage, null);

            }
            else
            {
                Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
                app.SaveToXml(FullFilePath, myPackage, null);

            }
        }

        private void RefreshMetadata(CManagedComponentWrapper mcw)
        {
            //Reinitialize the metadata, Refresh Columns
            mcw.AcquireConnections(null);
            mcw.ReinitializeMetaData();
            mcw.ReleaseConnections();

        }


        private Executable AddSQLExecutable(String MONIKER)
        {
            Executable exec = myPackage.Executables.Add(MONIKER);
            return exec;
        }

        protected void Dispose(bool disposing)
        {
            if (disposing)
            {
                // dispose managed resources
                myPackage.Dispose();
            }
            // free native resources
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    }
}

No comments:

Post a Comment

Contact Form

Name

Email *

Message *