After several days I finally completed the process to move data and and at the same time map the alpha and deltas here is the prototype code.
Now I need to start building a new prototype to load any type of file into a table, the metadata process is all here on the blog.
I wish sometime that I can be more organize on this blog but then I said why?
this code is all for reference used only and hopefully it help some poor soul in its search of better ETL processes.
Eventually most of the code should be self sustain as much as possible , however, most of this code is just to show how it can be done is not as clean as you may want but it works. Now back to keep fighting skynet.
#region Help: Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
* a .Net application within the context of an Integration Services control flow.
*
* Expand the other regions which have "Help" prefixes for examples of specific ways to use
* Integration Services features within this script task. */
#endregion
#region Namespaces
using System;
using System.Data;
using System.Data.OleDb;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Pipeline;
using PipelineWrapper = Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using RuntimeWrapper = Microsoft.SqlServer.Dts.Runtime.Wrapper;
using ScriptTask = Microsoft.SqlServer.Dts.Tasks.ScriptTask;
using System.Windows.Forms;
using System.Text;
#endregion
namespace ST_39f12f7077374a399d1056b5e517a2fd
{
/// <summary>
/// ScriptMain is the entry point class of the script. Do not change the name, attributes,
/// or parent of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
#region Help: Using Integration Services variables and parameters in a script
/* To use a variable in this script, first ensure that the variable has been added to
* either the list contained in the ReadOnlyVariables property or the list contained in
* the ReadWriteVariables property of this script task, according to whether or not your
* code needs to write to the variable. To add the variable, save this script, close this instance of
* Visual Studio, and update the ReadOnlyVariables and
* ReadWriteVariables properties in the Script Transformation Editor window.
* To use a parameter in this script, follow the same steps. Parameters are always read-only.
*
* Example of reading from a variable:
* DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
*
* Example of writing to a variable:
* Dts.Variables["User::myStringVariable"].Value = "new value";
*
* Example of reading from a package parameter:
* int batchId = (int) Dts.Variables["$Package::batchId"].Value;
*
* Example of reading from a project parameter:
* int batchId = (int) Dts.Variables["$Project::batchId"].Value;
*
* Example of reading from a sensitive project parameter:
* int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
* */
#endregion
#region Help: Firing Integration Services events from a script
/* This script task can fire events for logging purposes.
*
* Example of firing an error event:
* Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
*
* Example of firing an information event:
* Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
*
* Example of firing a warning event:
* Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
* */
#endregion
#region Help: Using Integration Services connection managers in a script
/* Some types of connection managers can be used in this script task. See the topic
* "Working with Connection Managers Programatically" for details.
*
* Example of using an ADO.Net connection manager:
* object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
* SqlConnection myADONETConnection = (SqlConnection)rawConnection;
* //Use the connection in some code here, then release the connection
* Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
*
* Example of using a File connection manager
* object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
* string filePath = (string)rawConnection;
* //Use the connection in some code here, then release the connection
* Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
* */
#endregion
/// <summary>
/// This method is called when this script task executes in the control flow.
/// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
/// To open Help, press F1.
/// </summary>
#region Global Variables
//Global variables
//Global Variables
Package myPackage = new Package();
Microsoft.SqlServer.Dts.Runtime.Application myApplication = new Microsoft.SqlServer.Dts.Runtime.Application();
Variables vars = null;
String SaveDirectory = "";
#endregion
public void Main()
{
// TODO: Add your code here
try
{
//Reference Variables
Dts.VariableDispenser.LockForRead("User::objColumnInfo");
Dts.VariableDispenser.LockForRead("User::strFileName");
Dts.VariableDispenser.LockForRead("User::strTabName");
Dts.VariableDispenser.LockForRead("User::strTableName");
Dts.VariableDispenser.LockForRead("User::strDatabaseName");
Dts.VariableDispenser.LockForRead("User::strServerName");
Dts.VariableDispenser.LockForRead("User::strBatchId");
Dts.VariableDispenser.LockForRead("User::strSaveDirectory");
Dts.VariableDispenser.LockForRead("User::strSqlCmd");
Dts.VariableDispenser.LockForRead("User::intBDELoaderID");
Dts.VariableDispenser.LockForWrite("User::intBDETotalRecords");
Dts.VariableDispenser.LockForRead("User::bolHasDelta");
Dts.VariableDispenser.LockForRead("User::strDeltaColumn");
//Get variable collector
Dts.VariableDispenser.GetVariables(ref vars);
vars["User::intBDETotalRecords"].Value = 0;
int BDELoaderId = (int)vars["User::intBDELoaderID"].Value;
String TableName = vars["User::strTableName"].Value.ToString();
myPackage.Name = "SsisMaster2Stage_" + TableName;
String MasterTableName = "[Master].["+TableName+"]";
String StageTableName = "[Stage].["+TableName+"]";
//Add Count Variable to Package
Microsoft.SqlServer.Dts.Runtime.Variable intRowCount = myPackage.Variables.Add("intRowCount", false, "User", 0);
Microsoft.SqlServer.Dts.Runtime.Variable intAddCount = myPackage.Variables.Add("intAddRowCount", false, "User", 0);
Microsoft.SqlServer.Dts.Runtime.Variable intChangeCount = myPackage.Variables.Add("intUpdRowCount", false, "User", 0);
Microsoft.SqlServer.Dts.Runtime.Variable intDeleteCount = myPackage.Variables.Add("intDeleteRowCount", false, "User", 0);
Microsoft.SqlServer.Dts.Runtime.Variable intBDELoaderId = myPackage.Variables.Add("intBDELoaderId", false, "User", BDELoaderId);
//SourceSQLCmd Qrty:
StringBuilder SourceSQLCmd = new StringBuilder();
StringBuilder LookupSQLCmd = new StringBuilder();
String DeltaColumns = vars["User::strDeltaColumn"].Value.ToString();
LookupSQLCmd.Append("SELECT ");
LookupSQLCmd.Append(DeltaColumns);
LookupSQLCmd.Append(" FROM ");
LookupSQLCmd.Append(StageTableName);
DataTable ColumnInfo = ReadColumnInfo(vars["User::objColumnInfo"].Value);
SourceSQLCmd.Append("SELECT ");
SourceSQLCmd.Append(CreateSelectCmd(ColumnInfo));
SourceSQLCmd.Append(" FROM ");
SourceSQLCmd.Append(MasterTableName);
SourceSQLCmd.Append(" WHERE BDELoaderId = ");
SourceSQLCmd.Append(vars["User::intBDELoaderID"].Value.ToString());
SaveDirectory = vars["User::strSaveDirectory"].Value.ToString();
// Joins Variables
PipelineWrapper.IDTSPath100 SourceToCount;
PipelineWrapper.IDTSPath100 Count2Lookup;
PipelineWrapper.IDTSPath100 Lookup2ConditionalSplit;
PipelineWrapper.IDTSPath100 ConditionalSplit2AddCount;
PipelineWrapper.IDTSPath100 ConditionalSplit2ChangeCount;
PipelineWrapper.IDTSPath100 AddCopunt2StageDestination;
//Expression Strings
String ConditionalAdd = "[DELTA_FILE_BYTE]==\"A\"";
String ConditionaldUpdate = "([DELTA_FILE_BYTE]==\"C\")||([DELTA_FILE_BYTE]==\"D\")";
String ConditionalColumnName = "DELTA_FILE_BYTE";
//ConditionalSplitOUtputs
PipelineWrapper.IDTSOutput100 AddOutput;
PipelineWrapper.IDTSOutput100 ChangeOutput;
//Create OLEDB Connection
ConnectionManager destination = AddOLEDBConnection("destination", Dts.Connections["host"].ConnectionString.ToString());
ConnectionManager source = AddOLEDBConnection("source", Dts.Connections["host"].ConnectionString.ToString());
ConnectionManager HOST = AddOLEDBConnection("host", Dts.Connections["host"].ConnectionString.ToString());
ConnectionManager MSDB = AddOLEDBConnection("msdb", Dts.Connections["msdb"].ConnectionString.ToString());
//Add Logging reference
AddLogging(MSDB.Name, true);
//Add Sequence Container
Executable execSequence = AddSQLExecutable("STOCK:Sequence");
Microsoft.SqlServer.Dts.Runtime.Sequence seqSequence = (Microsoft.SqlServer.Dts.Runtime.Sequence)execSequence;
//Sequence Properties
seqSequence.Name = "Master2StageProcess";
seqSequence.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.UseParentSetting;
seqSequence.Description = "Delta Record process";
// Add WorkFlow Component
Executable execDataFlow = seqSequence.Executables.Add("STOCK:PipelineTask");
Microsoft.SqlServer.Dts.Runtime.TaskHost thDataFlow = (Microsoft.SqlServer.Dts.Runtime.TaskHost)execDataFlow;
thDataFlow.Name = "Master2Stage";
PipelineWrapper.MainPipe DataFlow = (PipelineWrapper.MainPipe)thDataFlow.InnerObject;
//Add Source Componnent
PipelineWrapper.IDTSComponentMetaData100 icmdSource = AddComponentMetadata(DataFlow, "Source");
icmdSource.ComponentClassID="DTSAdapter.OLEDbSource";
//Source Componnent properties
PipelineWrapper.CManagedComponentWrapper mcwSource = icmdSource.Instantiate();
mcwSource.ProvideComponentProperties();
icmdSource.Name = "Master" + TableName;
icmdSource.RuntimeConnectionCollection[0].ConnectionManagerID = source.ID;
icmdSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(source);
//Source Access Mode
mcwSource.SetComponentProperty("AccessMode", 2);//SQLCmd
mcwSource.SetComponentProperty("SqlCommand", SourceSQLCmd.ToString());
//mcwSource.SetComponentProperty("AccessMode", 0); //TableOrView
//mcwSource.SetComponentProperty("OpenRowset", SourceSchTable);
Reinitiaze(mcwSource);
//Add RowCount Component
PipelineWrapper.IDTSComponentMetaData100 icmdRowCount = AddComponentMetadata(DataFlow, "RowCountComponent");
icmdRowCount.ComponentClassID = "DTSTransform.RowCount";
PipelineWrapper.CManagedComponentWrapper mcwRowCount = icmdRowCount.Instantiate();
mcwRowCount.ProvideComponentProperties();
icmdRowCount.Name = "TotalRecordCount";
mcwRowCount.SetComponentProperty("VariableName", "User::intRowCount");
SourceToCount = DataFlow.PathCollection.New();
SourceToCount.AttachPathAndPropagateNotifications(icmdSource.OutputCollection[0], icmdRowCount.InputCollection[0]);
Reinitiaze(mcwRowCount);
//Add Other Counts
PipelineWrapper.IDTSComponentMetaData100 icmdRowCountAdd = AddComponentMetadata(DataFlow, "RowCountComponent_Add");
icmdRowCountAdd.ComponentClassID = "DTSTransform.RowCount";
PipelineWrapper.CManagedComponentWrapper mcwRowCountAdd = icmdRowCountAdd.Instantiate();
mcwRowCountAdd.ProvideComponentProperties();
icmdRowCountAdd.Name = "NewRecordCount";
mcwRowCountAdd.SetComponentProperty("VariableName", "User::intAddRowCount");
PipelineWrapper.IDTSComponentMetaData100 icmdRowCountUpd = AddComponentMetadata(DataFlow, "RowCountComponent_UPD");
icmdRowCountUpd.ComponentClassID = "DTSTransform.RowCount";
PipelineWrapper.CManagedComponentWrapper mcwRowCountUpd = icmdRowCountUpd.Instantiate();
mcwRowCountUpd.ProvideComponentProperties();
icmdRowCountUpd.Name = "UpdateRecordCount";
mcwRowCountUpd.SetComponentProperty("VariableName", "User::intUpdRowCount");
Reinitiaze(mcwRowCountAdd);
// Join Source and RowCount ( Total Count )
//Add Lookup Component
PipelineWrapper.IDTSComponentMetaData100 icmdLookup = AddComponentMetadata(DataFlow, "Delta Lookup");
icmdLookup.ComponentClassID = "{671046B0-AA63-4C9F-90E4-C06E0B710CE3}";//"DTSTransform.Lookup.2";
PipelineWrapper.CManagedComponentWrapper mcwLookup = icmdLookup.Instantiate();
mcwLookup.ProvideComponentProperties();
//Join Count to Lookup
Count2Lookup = DataFlow.PathCollection.New();
Count2Lookup.AttachPathAndPropagateNotifications(icmdRowCount.OutputCollection[0], icmdLookup.InputCollection[0]);
icmdLookup.Name = "DeltaLookup";
//Set Connection
icmdLookup.RuntimeConnectionCollection[0].ConnectionManagerID = destination.ID;
icmdLookup.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(destination);
//Lookup Cache Type - Full = 0, Partial = 1, None = 2
mcwLookup.SetComponentProperty("CacheType", 0);//Full
mcwLookup.SetComponentProperty("SqlCommand", LookupSQLCmd.ToString());
mcwLookup.SetComponentProperty("NoMatchBehavior", 1); // Redirect Not Match output
Reinitiaze(mcwLookup);
//Join Count to Lookup
MappColumnsLookup(icmdLookup, mcwLookup, DeltaColumns);
Reinitiaze(mcwLookup);
//Add Conditional Split Add
PipelineWrapper.IDTSComponentMetaData100 icmConditionalSplit = AddComponentMetadata(DataFlow, "DeltaFileFlatAdd");
icmConditionalSplit.ComponentClassID = "DTSTransform.ConditionalSplit";
PipelineWrapper.CManagedComponentWrapper cmwConditionalSpit = icmConditionalSplit.Instantiate();
cmwConditionalSpit.ProvideComponentProperties();
icmConditionalSplit.Name = "NoMatchOption";
Reinitiaze(cmwConditionalSpit);
//Join Lookup Not Match 2 ConditionalSplit ( need to change to the Match not match stuff)
Lookup2ConditionalSplit = DataFlow.PathCollection.New();
Lookup2ConditionalSplit.AttachPathAndPropagateNotifications(icmdLookup.OutputCollection[1], icmConditionalSplit.InputCollection[0]);//No Match Output
AddOutput = ConditinalSplitOutput(cmwConditionalSpit, icmConditionalSplit, "Add", ConditionalAdd, ConditionalColumnName);
//Conditional Split Change or Delete
//Add Conditional Split Add
PipelineWrapper.IDTSComponentMetaData100 icmConditionalSplit2 = AddComponentMetadata(DataFlow, "DeltaFileFlatUpd");
icmConditionalSplit2.ComponentClassID = "DTSTransform.ConditionalSplit";
PipelineWrapper.CManagedComponentWrapper cmwConditionalSpit2 = icmConditionalSplit2.Instantiate();
cmwConditionalSpit2.ProvideComponentProperties();
icmConditionalSplit2.Name = "MacthOPTION";
Reinitiaze(cmwConditionalSpit2);
//Join Lookup Match 2 ConditionalSplit
ConditionalSplit2ChangeCount = DataFlow.PathCollection.New();
ConditionalSplit2ChangeCount.AttachPathAndPropagateNotifications(icmdLookup.OutputCollection[0], icmConditionalSplit2.InputCollection[0]);
ChangeOutput = ConditinalSplitOutput(cmwConditionalSpit2, icmConditionalSplit2, "Change", ConditionaldUpdate, ConditionalColumnName);
//Add Counts to each output
ConditionalSplit2AddCount = DataFlow.PathCollection.New();
ConditionalSplit2AddCount.AttachPathAndPropagateNotifications(AddOutput,icmdRowCountAdd.InputCollection[0]);
Reinitiaze(mcwRowCountAdd);
ConditionalSplit2ChangeCount = DataFlow.PathCollection.New();
ConditionalSplit2ChangeCount.AttachPathAndPropagateNotifications(ChangeOutput, icmdRowCountUpd.InputCollection[0]);
Reinitiaze(mcwRowCountUpd);
//Add OLEDB Destination Componnent
PipelineWrapper.IDTSComponentMetaData100 icmDestination = AddComponentMetadata(DataFlow, "destination");
icmDestination.ComponentClassID = "DTSAdapter.OLEDBDestination";
icmDestination.ValidateExternalMetadata = true;
PipelineWrapper.CManagedComponentWrapper mcwDestination = icmDestination.Instantiate();
// tblDestination
mcwDestination.ProvideComponentProperties();
icmDestination.Name = "Stage"+ TableName;
icmDestination.Description = "destination";
icmDestination.RuntimeConnectionCollection[0].ConnectionManagerID = destination.ID;
icmDestination.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(destination);
mcwDestination.SetComponentProperty("AccessMode", 0);//FastLoad
mcwDestination.SetComponentProperty("OpenRowset", StageTableName);//tblDestination);
mcwDestination.Validate();
mcwDestination.SetComponentProperty("FastLoadKeepIdentity", true);
mcwDestination.SetComponentProperty("FastLoadMaxInsertCommitSize", 10000);//FastLoadMaxCommitSize);
mcwDestination.SetComponentProperty("FastLoadKeepNulls", false);
mcwDestination.SetComponentProperty("FastLoadOptions", "TABLOCK,CHECK_CONSTRAINTS,ROWS_PER_BATCH = 50000");// + RowsPerBatch);
Reinitiaze(mcwDestination);
// Join Look up and destination
AddCopunt2StageDestination = DataFlow.PathCollection.New();
AddCopunt2StageDestination.AttachPathAndPropagateNotifications(icmdRowCountAdd.OutputCollection[0], icmDestination.InputCollection[0]);
MappColumns(icmDestination, mcwDestination);
SaveSSIS(SaveDirectory, myPackage.Name.ToString());
vars["User::intBDETotalRecords"].Value = -99;
vars.Unlock();
Dts.TaskResult = (int)ScriptResults.Success;
}
catch (Exception e)
{
SaveSSIS(SaveDirectory,"Error"+ myPackage.Name.ToString());
throw;
}
}
#region ScriptResults declaration
/// <summary>
/// This enum provides a convenient shorthand within the scope of this class for setting the
/// result of the script.
///
/// This code was generated automatically.
/// </summary>
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
#region SubPrograms
private void MappColumnsLookup(PipelineWrapper.IDTSComponentMetaData100 icmLookup, PipelineWrapper.CManagedComponentWrapper mcwLookup,String DeltaColumns)
{
//mappings
PipelineWrapper.IDTSInput100 inputLookup = icmLookup.InputCollection[0];
PipelineWrapper.IDTSInputColumnCollection100 cinputlookup = inputLookup.InputColumnCollection;
PipelineWrapper.IDTSVirtualInput100 vinputLookup = inputLookup.GetVirtualInput();
PipelineWrapper.IDTSVirtualInputColumnCollection100 vinpcDestination = vinputLookup.VirtualInputColumnCollection;
foreach (PipelineWrapper.IDTSVirtualInputColumn100 vcolumn in vinputLookup.VirtualInputColumnCollection)
{
try
{
PipelineWrapper.IDTSInputColumn100 inputColumn = mcwLookup.SetUsageType(inputLookup.ID, vinputLookup, vcolumn.LineageID, PipelineWrapper.DTSUsageType.UT_READONLY);
// map external column
mcwLookup.SetInputColumnProperty(inputLookup.ID, inputColumn.ID, "JoinToReferenceColumn", vcolumn.Name);
// mcwLookup.MapInputColumn(inputLookup.ID, inputColumn.ID, extColumn.ID);
}
catch
{
PipelineWrapper.IDTSInputColumn100 inputColumn = mcwLookup.SetUsageType(inputLookup.ID, vinputLookup, vcolumn.LineageID, PipelineWrapper.DTSUsageType.UT_IGNORED);
throw;
// map external column
}
}
}
private String CreateSelectCmd(DataTable ColumnINfo)
{
StringBuilder SQLCmd = new StringBuilder();
foreach(DataRow row in ColumnINfo.Rows)
{
Int32 FieldLocation = (Int32)row["FieldLocation"];
String FieldLevelName = "";
if (FieldLocation == 1)
{
FieldLevelName = "[BDELOADERID]";
FieldLevelName = FieldLevelName+",["+(String)row["FieldLevelName"]+"]";
SQLCmd.Append(FieldLevelName);
}
else
{
FieldLevelName = ",["+(String)row["FieldLevelName"]+"]";
SQLCmd.Append(FieldLevelName);
}
}
return SQLCmd.ToString();
}
private PipelineWrapper.IDTSOutput100 ConditinalSplitOutput(PipelineWrapper.CManagedComponentWrapper mcwConditionalSplit, PipelineWrapper.IDTSComponentMetaData100 icmdConditionalSplit, String OutputNanme, String ConditionExpression, String ConditionalColumnName)
{
PipelineWrapper.IDTSOutput100 Output = mcwConditionalSplit.InsertOutput(PipelineWrapper.DTSInsertPlacement.IP_BEFORE, icmdConditionalSplit.OutputCollection[0].ID);
Output.Name = OutputNanme;
Output.Description = OutputNanme + " Records";
//Set virtual inputs
PipelineWrapper.IDTSInput100 ConditionalInput = icmdConditionalSplit.InputCollection[0];
PipelineWrapper.IDTSVirtualInput100 vConditionalInput = ConditionalInput.GetVirtualInput();
PipelineWrapper.IDTSVirtualInputColumnCollection100 vcConditionalInputs = vConditionalInput.VirtualInputColumnCollection;
//Set Columns Mapping and buffer
foreach (PipelineWrapper.IDTSVirtualInputColumn100 vcolumn in vConditionalInput.VirtualInputColumnCollection)
{
String ColumnName = vcolumn.Name.ToLower();
if (ColumnName == ConditionalColumnName.ToLower())
{
PipelineWrapper.IDTSInputColumn100 col = mcwConditionalSplit.SetUsageType(ConditionalInput.ID, vConditionalInput, vcolumn.LineageID, PipelineWrapper.DTSUsageType.UT_READONLY);
mcwConditionalSplit.SetOutputProperty(Output.ID, "FriendlyExpression", ConditionExpression);
}
}
return Output;
}
private void AddLogging(String ConnectionName, Boolean Enable)
{
LogProvider pkLogging;
pkLogging = myPackage.LogProviders.Add("DTS.LogProviderSQLServer");
pkLogging.Name = "Log Provider For SQL SERVER";
pkLogging.Description = "Log Provider For SQL SERVER";
pkLogging.ConfigString = ConnectionName;
myPackage.LoggingOptions.SelectedLogProviders.Add(pkLogging);
myPackage.LoggingOptions.EventFilterKind = Microsoft.SqlServer.Dts.Runtime.DTSEventFilterKind.Inclusion;
myPackage.LoggingOptions.EventFilter = new String[] { "OnPreExecute", "OnPostExecute", "OnError" };
switch (Enable)
{
case true:
myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Enabled;
break;
case false:
myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Disabled;
break;
default:
myPackage.LoggingMode = Microsoft.SqlServer.Dts.Runtime.DTSLoggingMode.Disabled;
break;
}
}
/*
* Add Destinatin Componnet
*/
private PipelineWrapper.IDTSComponentMetaData100 AddComponentMetadata(PipelineWrapper.MainPipe wf, String ComponentName)
{
PipelineWrapper.IDTSComponentMetaData100 icmd = wf.ComponentMetaDataCollection.New();
return icmd;
}
private void SaveSSIS(String Directory, String PackageName)
{
String FullFilePath = Directory + "\\" + PackageName + ".dtsx";
myApplication.SaveToXml(FullFilePath, myPackage, null);
}
/* Read object Columns into a datatable*/
private DataTable ReadColumnInfo(object ColumnInfo)
{
OleDbDataAdapter oleDb = new OleDbDataAdapter();
DataTable dt = new DataTable();
oleDb.Fill(dt, ColumnInfo);
return dt;
}
/*
* Reinitiazied
*/
private void Reinitiaze(PipelineWrapper.CManagedComponentWrapper InstanceSource)
{
//Reinitialize Flat File source metadata,
InstanceSource.AcquireConnections(null);
InstanceSource.ReinitializeMetaData();
InstanceSource.ReleaseConnections();
}
/*
* Add OLEDB Connection
*/
public ConnectionManager AddOLEDBConnection(String ConnectionName, String ConnectionStr)
{
ConnectionManager ConMgr = myPackage.Connections.Add("OLEDB");
ConMgr.ConnectionString = ConnectionStr + "Packet Size=32076;";
ConMgr.Name = ConnectionName;
ConMgr.Description = "SQL OLEDB using " + ConnectionName;
return ConMgr;
}
/*
* Add WorkFlow
*
*/
private PipelineWrapper.MainPipe AddWorkFlowComponent(String ComponentName)
{
myPackage.Executables.Add("STOCK:PipelineTask");
TaskHost _TaskHost = (TaskHost)myPackage.Executables[0];
PipelineWrapper.MainPipe dataFlowTask = (PipelineWrapper.MainPipe)_TaskHost.InnerObject;
_TaskHost.Name = ComponentName;
_TaskHost.Properties["DefaultBufferMaxRows"].SetValue(_TaskHost, "1000000");
return dataFlowTask;
}
// Create SSIS executable
private Executable AddSQLExecutable(String MONIKER)
{
Executable exec = myPackage.Executables.Add(MONIKER);
return exec;
}
private void MappColumns(PipelineWrapper.IDTSComponentMetaData100 icmDest, PipelineWrapper.CManagedComponentWrapper mcwDest)
{
//mappings
PipelineWrapper.IDTSInput100 inpDestination = icmDest.InputCollection[0];
PipelineWrapper.IDTSVirtualInput100 vinpDestination = inpDestination.GetVirtualInput();
PipelineWrapper.IDTSVirtualInputColumnCollection100 vinpcDestination = (PipelineWrapper.IDTSVirtualInputColumnCollection100)vinpDestination.VirtualInputColumnCollection;
foreach (PipelineWrapper.IDTSVirtualInputColumn100 vcolumn in vinpDestination.VirtualInputColumnCollection)
{
try
{
if (vcolumn.DataType != RuntimeWrapper.DataType.DT_BYTES)
{
PipelineWrapper.IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, PipelineWrapper.DTSUsageType.UT_READONLY);
// map external column
PipelineWrapper.IDTSExternalMetadataColumn100 extColumn = inpDestination.ExternalMetadataColumnCollection[inputColumn.Name];
mcwDest.MapInputColumn(inpDestination.ID, inputColumn.ID, extColumn.ID);
}
else
{
PipelineWrapper.IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, PipelineWrapper.DTSUsageType.UT_IGNORED);
}
}
catch
{
PipelineWrapper.IDTSInputColumn100 inputColumn = mcwDest.SetUsageType(inpDestination.ID, vinpDestination, vcolumn.LineageID, PipelineWrapper.DTSUsageType.UT_IGNORED);
// map external column
}
}
}
#endregion
}
}
No comments:
Post a Comment