using System;
using System.Collections;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts;
using System.Threading.Tasks;
using System.Collections.Generic;
namespace CustomComponents
{
[DtsPipelineComponent
(
DisplayName = "TrimThatColumn",
ComponentType = ComponentType.Transform,
IconResource = "TrimThatColumn.TrimThatColumn.ico"
)
]
public class TrimThatColumn : PipelineComponent
{
//public const int PARALLEL_THD = 4;
// ArrayList indexesToTrim = new ArrayList();
// ArrayList indexesToCopy = new ArrayList();
// object[] indexesToTrimArray;
// bool userParallelism = false;
public override void ProvideComponentProperties()
{
base.ProvideComponentProperties();
ComponentMetaData.InputCollection[0].Name = "Input";
ComponentMetaData.OutputCollection[0].Name = "Output";
ComponentMetaData.Description = "Trim All Columns from a source";
ComponentMetaData.ContactInfo = "Jorge Novo";
}
public override void ReinitializeMetaData()
{
// This should delete orphaned columns for us, calling OnDeletingInputColumn
base.ReinitializeMetaData();
}
/// <summary>
/// This method is called once before execution. We set up this component by choosing which strategy to use when processisng input:
/// If we have less than PARALLEL_THD columns as inputs, we'll use a serial approach. Otherwise we'll use parallelism.
/// </summary>
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;
//foreach (IDTSInputColumn100 column in inputColumns)
//{
// if (column.DataType == DataType.DT_STR || column.DataType == DataType.DT_WSTR)
// {
// indexesToTrim.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));
// }
// else
// indexesToCopy.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));
//}
//if (indexesToTrim.Count > PARALLEL_THD)
// userParallelism = true;
//else
// userParallelism = false;
//indexesToTrimArray = indexesToTrim.ToArray();
}
/// <summary>
/// This method will process each row at once. The .NET framework uses to collect some rows in a buffer and pass the whole buffer to the
/// function. So we have to process multiple rows at once.
/// </summary>
/// <param name="inputID"></param>
/// <param name="buffer"></param>
public override void ProcessInput(int inputID, PipelineBuffer Buffer)
{
while (Buffer.NextRow())
{
var cols = Buffer.ColumnCount;
//loop through each column
for (var i = 0; i < cols; i++)
{
//get datatype of the column
var type = Buffer.GetColumnInfo(i).DataType;
//only operate on columns of type DT_STR or DT_WSTR
if (type == DataType.DT_STR || type == DataType.DT_WSTR)
{
//grab the data from the column and trim it
var colData = Buffer[i].ToString().Trim().Replace(Convert.ToChar(0x0).ToString()," ");
if (string.IsNullOrEmpty(colData))
{
//if the trimmed data is blank or null, set the output to null
Buffer.SetNull(i);
}
else
{
//if the column has data, set the output to the trimmed version
Buffer.SetString(i, colData);
}
}
}
}
}
private DTSValidationStatus promoteStatus(ref DTSValidationStatus currentStatus, DTSValidationStatus newStatus)
{
// statuses are ranked in order of increasing severity, from
// valid to broken to needsnewmetadata to corrupt.
// bad status, if any, is result of programming error
switch (currentStatus)
{
case DTSValidationStatus.VS_ISVALID:
switch (newStatus)
{
case DTSValidationStatus.VS_ISBROKEN:
case DTSValidationStatus.VS_ISCORRUPT:
case DTSValidationStatus.VS_NEEDSNEWMETADATA:
currentStatus = newStatus;
break;
case DTSValidationStatus.VS_ISVALID:
break;
default:
throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
}
break;
case DTSValidationStatus.VS_ISBROKEN:
switch (newStatus)
{
case DTSValidationStatus.VS_ISCORRUPT:
case DTSValidationStatus.VS_NEEDSNEWMETADATA:
currentStatus = newStatus;
break;
case DTSValidationStatus.VS_ISVALID:
case DTSValidationStatus.VS_ISBROKEN:
break;
default:
throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
}
break;
case DTSValidationStatus.VS_NEEDSNEWMETADATA:
switch (newStatus)
{
case DTSValidationStatus.VS_ISCORRUPT:
currentStatus = newStatus;
break;
case DTSValidationStatus.VS_ISVALID:
case DTSValidationStatus.VS_ISBROKEN:
case DTSValidationStatus.VS_NEEDSNEWMETADATA:
break;
default:
throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
}
break;
case DTSValidationStatus.VS_ISCORRUPT:
switch (newStatus)
{
case DTSValidationStatus.VS_ISCORRUPT:
case DTSValidationStatus.VS_ISVALID:
case DTSValidationStatus.VS_ISBROKEN:
case DTSValidationStatus.VS_NEEDSNEWMETADATA:
break;
default:
throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
}
break;
default:
throw new System.ApplicationException("Internal Error: A value outside the scope of the status enumeration was found.");
}
return currentStatus;
}
}
}
No comments:
Post a Comment