September 16, 2014

Custom Component TrimThatColumn

using System;
using System.Collections;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace CustomComponents
{
    [DtsPipelineComponent
        (
            DisplayName = "TrimThatColumn",
            ComponentType = ComponentType.Transform,
            IconResource = "TrimThatColumn.TrimThatColumn.ico"
        )
    ]
    public class TrimThatColumn : PipelineComponent
    {
        //public const int PARALLEL_THD = 4;
       // ArrayList indexesToTrim = new ArrayList();
      //  ArrayList indexesToCopy = new ArrayList();
      //  object[] indexesToTrimArray;
      //  bool userParallelism = false;

        public override void ProvideComponentProperties()
        {
            base.ProvideComponentProperties();
            ComponentMetaData.InputCollection[0].Name = "Input";
            ComponentMetaData.OutputCollection[0].Name = "Output";
            ComponentMetaData.Description = "Trim All Columns from a source";
            ComponentMetaData.ContactInfo = "Jorge Novo";
        }

        public override void ReinitializeMetaData()
        {
            // This should delete orphaned columns for us, calling OnDeletingInputColumn
            base.ReinitializeMetaData();
        }

        /// <summary>
        /// This method is called once before execution. We set up this component by choosing which strategy to use when processisng input:
        /// If we have less than PARALLEL_THD columns as inputs, we'll use a serial approach. Otherwise we'll use parallelism.
        /// </summary>
        public override void PreExecute()
        {
            IDTSInput100 input = ComponentMetaData.InputCollection[0];
            IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;

            //foreach (IDTSInputColumn100 column in inputColumns)
            //{
            //    if (column.DataType == DataType.DT_STR || column.DataType == DataType.DT_WSTR)
            //    {
            //        indexesToTrim.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));
            //    }
            //    else
            //        indexesToCopy.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));
            //}

            //if (indexesToTrim.Count > PARALLEL_THD)
            //    userParallelism = true;
            //else
            //    userParallelism = false;

            //indexesToTrimArray = indexesToTrim.ToArray();
        }

        /// <summary>
        /// This method will process each row at once. The .NET framework uses to collect some rows in a buffer and pass the whole buffer to the
        /// function. So we have to process multiple rows at once.
        /// </summary>
        /// <param name="inputID"></param>
        /// <param name="buffer"></param>
        public override void ProcessInput(int inputID, PipelineBuffer Buffer)
        {


            while (Buffer.NextRow())
            {

                var cols = Buffer.ColumnCount;

                //loop through each column

                for (var i = 0; i < cols; i++)
                {

                    //get datatype of the column

                    var type = Buffer.GetColumnInfo(i).DataType;



                    //only operate on columns of type DT_STR or DT_WSTR

                    if (type == DataType.DT_STR || type == DataType.DT_WSTR)
                    {

                        //grab the data from the column and trim it

                        var colData = Buffer[i].ToString().Trim().Replace(Convert.ToChar(0x0).ToString()," ");

                        if (string.IsNullOrEmpty(colData))
                        {

                            //if the trimmed data is blank or null, set the output to null

                            Buffer.SetNull(i);

                        }

                        else
                        {

                            //if the column has data, set the output to the trimmed version

                            Buffer.SetString(i, colData);

                        }

                    }

                }
            }
        }
        private DTSValidationStatus promoteStatus(ref DTSValidationStatus currentStatus, DTSValidationStatus newStatus)
        {
            // statuses are ranked in order of increasing severity, from
            //   valid to broken to needsnewmetadata to corrupt.
            // bad status, if any, is result of programming error
            switch (currentStatus)
            {
                case DTSValidationStatus.VS_ISVALID:
                    switch (newStatus)
                    {
                        case DTSValidationStatus.VS_ISBROKEN:
                        case DTSValidationStatus.VS_ISCORRUPT:
                        case DTSValidationStatus.VS_NEEDSNEWMETADATA:
                            currentStatus = newStatus;
                            break;
                        case DTSValidationStatus.VS_ISVALID:
                            break;
                        default:
                            throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
                    }
                    break;
                case DTSValidationStatus.VS_ISBROKEN:
                    switch (newStatus)
                    {
                        case DTSValidationStatus.VS_ISCORRUPT:
                        case DTSValidationStatus.VS_NEEDSNEWMETADATA:
                            currentStatus = newStatus;
                            break;
                        case DTSValidationStatus.VS_ISVALID:
                        case DTSValidationStatus.VS_ISBROKEN:
                            break;
                        default:
                            throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
                    }
                    break;
                case DTSValidationStatus.VS_NEEDSNEWMETADATA:
                    switch (newStatus)
                    {
                        case DTSValidationStatus.VS_ISCORRUPT:
                            currentStatus = newStatus;
                            break;
                        case DTSValidationStatus.VS_ISVALID:
                        case DTSValidationStatus.VS_ISBROKEN:
                        case DTSValidationStatus.VS_NEEDSNEWMETADATA:
                            break;
                        default:
                            throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
                    }
                    break;
                case DTSValidationStatus.VS_ISCORRUPT:
                    switch (newStatus)
                    {
                        case DTSValidationStatus.VS_ISCORRUPT:
                        case DTSValidationStatus.VS_ISVALID:
                        case DTSValidationStatus.VS_ISBROKEN:
                        case DTSValidationStatus.VS_NEEDSNEWMETADATA:
                            break;
                        default:
                            throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
                    }
                    break;
                default:
                    throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
            }
            return currentStatus;
        }
    }
}

No comments:

Post a Comment

Contact Form

Name

Email *

Message *