Ok we extract data from postgres but what happen when those tables have millions of records and you still need to used a datatable for it,
well the solution create two queries the first query get the limit 50000 records then extract the maxid from the datatable set your flag that indicate is the second run and loop 50k at the time until the extract count is less than the limit.... Here it is....:-)
/*
Microsoft SQL Server Integration Services Script Task
Write scripts using Microsoft Visual C# 2008.
The ScriptMain is the entry point class of the script.
*/
using System;
using Npgsql;
using System.Data;
using System.IO;
using System.Text;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
namespace ST_b7b6f2660bba404bb3315fc71b301f0e.csproj
{
[System.AddIn.AddIn("ScriptMain", Version = "1.0", Publisher = "", Description = "")]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
#region VSTA generated code
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
/*
The execution engine calls this method when the task executes.
To access the object model, use the Dts property. Connections, variables, events,
and logging features are available as members of the Dts property as shown in the following examples.
To reference a variable, call Dts.Variables["MyCaseSensitiveVariableName"].Value;
To post a log entry, call Dts.Log("This is my log text", 999, null);
To fire an event, call Dts.Events.FireInformation(99, "test", "hit the help message", "", 0, true);
To use the connections collection use something like the following:
ConnectionManager cm = Dts.Connections.Add("OLEDB");
cm.ConnectionString = "Data Source=localhost;Initial Catalog=AdventureWorks;Provider=SQLNCLI10;Integrated Security=SSPI;Auto Translate=False;";
Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
To open Help, press F1.
*/
Variables vars = null;
public void Main()
{
//SSIS Variables
Dts.VariableDispenser.LockForRead("User::strBatchId");
Dts.VariableDispenser.LockForRead("User::strfTableName");
Dts.VariableDispenser.LockForRead("User::intPromoBatchId");
Dts.VariableDispenser.LockForRead("User::strSqlCmd");
Dts.VariableDispenser.LockForRead("User::strFileDirectoryPath");
Dts.VariableDispenser.LockForRead("User::strPostgresCon5");
Dts.VariableDispenser.LockForRead("User::strImportDirectory5");
Dts.VariableDispenser.LockForRead("User::intExtractCount");
Dts.VariableDispenser.LockForRead("User::bolFirstBatch");
Dts.VariableDispenser.LockForRead("User::strSqlCmdSub");
Dts.VariableDispenser.GetVariables(ref vars);
int promoBatchId = (int)vars["User::intPromoBatchId"].Value;
Boolean FirstBatch = (Boolean)vars["User::bolFirstBatch"].Value;
String FileName = vars["User::strfTableName"].Value.ToString() + "_" + vars["User::strBatchId"].Value.ToString() + "_";
FileName = FileName + promoBatchId.ToString() + ".txt";
String Delimiter = "|";
int PromoBatchId = promoBatchId;
String fullFilePath = vars["User::strImportDirectory5"].Value.ToString() + FileName;
Boolean isSuccess;
String sqlcmd = vars["User::strSqlCmd"].Value.ToString();
String ConnStr = vars["User::strPostgresCon5"].Value.ToString();
isSuccess = Generate_PostGres_File(ConnStr, sqlcmd, fullFilePath, PromoBatchId, Delimiter);
// TODO: Add your code here
vars.Unlock();
Dts.TaskResult = (int)ScriptResults.Success;
}
/*
* Get Header File
*/
private String GetFileHeader(ref DataTable dt, String FileDelimiter)
{
StringBuilder HeaderFile = new StringBuilder();
String qoute = "";
String RS = System.Environment.NewLine;
foreach (DataColumn col in dt.Columns)
{
HeaderFile.Append(FileDelimiter);
HeaderFile.Append(qoute + col.ColumnName + qoute);
}
HeaderFile.Remove(0, 1);
HeaderFile.Append(RS);
return HeaderFile.ToString();
}
private Boolean Generate_PostGres_File(String ConnectionString, String SqlCmd, String FullFilePath, int PromoBatchId, String FileDelimiter)
{
String tablename = vars["User::strfTableName"].Value.ToString();
String SubQuery = vars["User::strSqlCmdSub"].Value.ToString();
Boolean FirstBatch = (Boolean)vars["User::bolFirstBatch"].Value;
Boolean head = false;
String qoute = "";
String RS = System.Environment.NewLine;
Int64 PromotionId = 0;
int ExtractCount = (int) vars["User::intExtractCount"].Value;
int RowsCount = ExtractCount;
try
{
using (StreamWriter sw = File.CreateText(FullFilePath))
{
while ( RowsCount >= ExtractCount)
{
using (NpgsqlConnection conn = new NpgsqlConnection(ConnectionString))
{
conn.Open();
using (NpgsqlCommand cmd = new NpgsqlCommand(SqlCmd, conn))
{
if (!FirstBatch)
{
cmd.Parameters.AddWithValue("parameter", BatchId);
cmd.Parameters.AddWithValue("parameter", primarykeyId);
}
else
{
cmd.Parameters.AddWithValue("parameter", BatchId);
}
using (NpgsqlDataReader reader = cmd.ExecuteReader())
{
DataTable dt = new DataTable();
int cnt = 0;
dt.Load(reader);
if (dt != null)
{
String header = String.Empty ;
if (FirstBatch)
{
header = GetFileHeader(ref dt, FileDelimiter);
}
foreach (DataRow row in dt.Rows)
{
StringBuilder rdata = new StringBuilder();
StringBuilder tdata = new StringBuilder();
if (!head && FirstBatch)
{
rdata.Append(header);
}
head = true;
foreach (object item in row.ItemArray)
{
if (item is System.DBNull)
{
tdata.Append(FileDelimiter + qoute + qoute);
}
else
{
String columnData = item.ToString();
tdata.Append(FileDelimiter);
tdata.Append(qoute + columnData + qoute);
}
}
tdata.Remove(0, 1);
tdata.Append(RS);
rdata.Append(tdata.ToString());
tdata = null;
sw.Write(rdata);
}
}
cnt = dt.Rows.Count;
RowsCount = cnt;
PromotionId = (Int64)dt.Compute("MAX(Promotion_Id)", string.Empty);
if (FirstBatch)
{
FirstBatch = false;
SqlCmd = vars["User::strSqlCmdSub"].Value.ToString();
}
}
}
conn.Close();
}
}
}
---delete records that are simply empty (files)
FileInfo _f = new FileInfo(FullFilePath);
if (_f.Exists & _f.Length == 0)
{
_f.Delete();
}
return true;
}
catch
{
return false;
}
}
}
}
"I hear and I forget. I see and I remember. I do and I understand."
Confucius
Jorge Novo Development Solutions,Ideas, and Crazy Thoughts.
Email: ETLDEVDBA(at)gmail.com
Tweeter: @ETLDEVDBA
September 30, 2015
September 10, 2015
Connect To PostgreSQL Using SSIS
Lately I have been working on a sync process between postgres and other databases, so after much research about drivers I found that the odbc drivers for postgres really sucks. So, after much pain I found the NPSQL drivers and after some thought said wait... I can connect using the script task and generate the files of my tables using metadata and because is ssis not need to build a new service. It is a pain but it gets the job done.
So... Here it is...
/*
Microsoft SQL Server Integration Services Script Task
Write scripts using Microsoft Visual C# 2008.
The ScriptMain is the entry point class of the script.
*/
using System;
using Npgsql;
using System.Data;
using System.IO;
using System.Text;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
namespace ST_b7b6f2660bba404bb3315fc71b301f0e.csproj
{
[System.AddIn.AddIn("ScriptMain", Version = "1.0", Publisher = "", Description = "")]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
#region VSTA generated code
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
/*
The execution engine calls this method when the task executes.
To access the object model, use the Dts property. Connections, variables, events,
and logging features are available as members of the Dts property as shown in the following examples.
To reference a variable, call Dts.Variables["MyCaseSensitiveVariableName"].Value;
To post a log entry, call Dts.Log("This is my log text", 999, null);
To fire an event, call Dts.Events.FireInformation(99, "test", "hit the help message", "", 0, true);
To use the connections collection use something like the following:
ConnectionManager cm = Dts.Connections.Add("OLEDB");
cm.ConnectionString = "Data Source=localhost;Initial Catalog=AdventureWorks;Provider=SQLNCLI10;Integrated Security=SSPI;Auto Translate=False;";
Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
To open Help, press F1.
*/
Variables vars = null;
public void Main()
{
//SSIS Variables
Dts.VariableDispenser.LockForRead("User::strBatchId");
Dts.VariableDispenser.LockForWrite("User::strFileName");
Dts.VariableDispenser.LockForRead("User::strTableName");
Dts.VariableDispenser.LockForRead("User::strSqlCmd");
Dts.VariableDispenser.LockForRead("User::strFileDirectoryPath");
Dts.VariableDispenser.LockForRead("User::strPostgresConStr");
Dts.VariableDispenser.GetVariables(ref vars);
String FileName = vars["User::strTableName"].Value.ToString() + "_" + vars["User::strBatchId"].Value.ToString()+".txt";
String Delimiter = "|";
int PromoBatchId = 5;
String fullFilePath = vars["User::strFileDirectoryPath"].Value.ToString() + FileName;
Boolean isSuccess;
//String UID = "postgres";
//String PWD = "Pa$$w0rd1";
String sqlcmd = vars["User::strSqlCmd"].Value.ToString();
String ConnStr = vars["User::strPostgresConStr"].Value.ToString();
//String.Format("Pooling=true;MinPoolSize=1;MaxPoolSize=20;Timeout=30;Server=" + ServerName + ";Database=" + DatabaseName + ";Port=" + DbPort + ";Integrated Security=true");
isSuccess = Generate_PostGres_File(ConnStr, sqlcmd, fullFilePath, PromoBatchId,Delimiter);
vars.Unlock();
if (isSuccess)
Dts.TaskResult = (int)ScriptResults.Success;
else
Dts.TaskResult = (int)ScriptResults.Failure;
// TODO: Add your code here
}
/*
* Get Header File
*/
private String GetFileHeader(ref DataTable dt,String FileDelimiter)
{
StringBuilder HeaderFile = new StringBuilder();
String qoute = "";
String RS = System.Environment.NewLine;
foreach(DataColumn col in dt.Columns)
{
HeaderFile.Append(FileDelimiter);
HeaderFile.Append(qoute + col.ColumnName + qoute);
}
HeaderFile.Remove(0,1);
HeaderFile.Append(RS);
return HeaderFile.ToString();
}
private Boolean Generate_PostGres_File(String ConnectionString, String SqlCmd,String FullFilePath,int PromoBatchId,String FileDelimiter)
{
String tablename = vars["User::strTableName"].Value.ToString();
Boolean head = false;
String qoute = "";
String RS = System.Environment.NewLine;
try
{
using (StreamWriter sw = File.CreateText(FullFilePath))
{
using (NpgsqlConnection conn = new NpgsqlConnection(ConnectionString))
{
conn.Open();
using (NpgsqlCommand cmd = new NpgsqlCommand(SqlCmd, conn))
{
//if (tablename.ToLower().CompareTo("promotion")==0)
//{
// cmd.Parameters.AddWithValue("promobatchid", PromoBatchId);
//}
//cmd.CommandText = sqlcmd;
using (NpgsqlDataReader reader = cmd.ExecuteReader())
{
DataTable dt = new DataTable();
dt.Load(reader);
String header = GetFileHeader(ref dt, FileDelimiter);
foreach (DataRow row in dt.Rows)
{
StringBuilder rdata = new StringBuilder();
StringBuilder tdata = new StringBuilder();
if (head == false)
{
rdata.Append(header);
}
head = true;
foreach (object item in row.ItemArray)
{
if (item is System.DBNull)
{
tdata.Append(FileDelimiter + qoute + qoute);
}
else
{
String columnData = item.ToString();
tdata.Append(FileDelimiter);
tdata.Append(qoute + columnData + qoute);
}
}
tdata.Remove(0, 1);
tdata.Append(RS);
rdata.Append(tdata.ToString());
tdata = null;
sw.Write(rdata);
rdata = null;
}
}
}
conn.Clone();
}
}
return true;
}
catch (Exception e)
{
Dts.Events.FireError(0, "PostGres FileGenerator Error", e.ToString(), string.Empty, 0);
return false;
}
}
}
}
So... Here it is...
/*
Microsoft SQL Server Integration Services Script Task
Write scripts using Microsoft Visual C# 2008.
The ScriptMain is the entry point class of the script.
*/
using System;
using Npgsql;
using System.Data;
using System.IO;
using System.Text;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
namespace ST_b7b6f2660bba404bb3315fc71b301f0e.csproj
{
[System.AddIn.AddIn("ScriptMain", Version = "1.0", Publisher = "", Description = "")]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
#region VSTA generated code
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
/*
The execution engine calls this method when the task executes.
To access the object model, use the Dts property. Connections, variables, events,
and logging features are available as members of the Dts property as shown in the following examples.
To reference a variable, call Dts.Variables["MyCaseSensitiveVariableName"].Value;
To post a log entry, call Dts.Log("This is my log text", 999, null);
To fire an event, call Dts.Events.FireInformation(99, "test", "hit the help message", "", 0, true);
To use the connections collection use something like the following:
ConnectionManager cm = Dts.Connections.Add("OLEDB");
cm.ConnectionString = "Data Source=localhost;Initial Catalog=AdventureWorks;Provider=SQLNCLI10;Integrated Security=SSPI;Auto Translate=False;";
Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
To open Help, press F1.
*/
Variables vars = null;
public void Main()
{
//SSIS Variables
Dts.VariableDispenser.LockForRead("User::strBatchId");
Dts.VariableDispenser.LockForWrite("User::strFileName");
Dts.VariableDispenser.LockForRead("User::strTableName");
Dts.VariableDispenser.LockForRead("User::strSqlCmd");
Dts.VariableDispenser.LockForRead("User::strFileDirectoryPath");
Dts.VariableDispenser.LockForRead("User::strPostgresConStr");
Dts.VariableDispenser.GetVariables(ref vars);
String FileName = vars["User::strTableName"].Value.ToString() + "_" + vars["User::strBatchId"].Value.ToString()+".txt";
String Delimiter = "|";
int PromoBatchId = 5;
String fullFilePath = vars["User::strFileDirectoryPath"].Value.ToString() + FileName;
Boolean isSuccess;
//String UID = "postgres";
//String PWD = "Pa$$w0rd1";
String sqlcmd = vars["User::strSqlCmd"].Value.ToString();
String ConnStr = vars["User::strPostgresConStr"].Value.ToString();
//String.Format("Pooling=true;MinPoolSize=1;MaxPoolSize=20;Timeout=30;Server=" + ServerName + ";Database=" + DatabaseName + ";Port=" + DbPort + ";Integrated Security=true");
isSuccess = Generate_PostGres_File(ConnStr, sqlcmd, fullFilePath, PromoBatchId,Delimiter);
vars.Unlock();
if (isSuccess)
Dts.TaskResult = (int)ScriptResults.Success;
else
Dts.TaskResult = (int)ScriptResults.Failure;
// TODO: Add your code here
}
/*
* Get Header File
*/
private String GetFileHeader(ref DataTable dt,String FileDelimiter)
{
StringBuilder HeaderFile = new StringBuilder();
String qoute = "";
String RS = System.Environment.NewLine;
foreach(DataColumn col in dt.Columns)
{
HeaderFile.Append(FileDelimiter);
HeaderFile.Append(qoute + col.ColumnName + qoute);
}
HeaderFile.Remove(0,1);
HeaderFile.Append(RS);
return HeaderFile.ToString();
}
private Boolean Generate_PostGres_File(String ConnectionString, String SqlCmd,String FullFilePath,int PromoBatchId,String FileDelimiter)
{
String tablename = vars["User::strTableName"].Value.ToString();
Boolean head = false;
String qoute = "";
String RS = System.Environment.NewLine;
try
{
using (StreamWriter sw = File.CreateText(FullFilePath))
{
using (NpgsqlConnection conn = new NpgsqlConnection(ConnectionString))
{
conn.Open();
using (NpgsqlCommand cmd = new NpgsqlCommand(SqlCmd, conn))
{
//if (tablename.ToLower().CompareTo("promotion")==0)
//{
// cmd.Parameters.AddWithValue("promobatchid", PromoBatchId);
//}
//cmd.CommandText = sqlcmd;
using (NpgsqlDataReader reader = cmd.ExecuteReader())
{
DataTable dt = new DataTable();
dt.Load(reader);
String header = GetFileHeader(ref dt, FileDelimiter);
foreach (DataRow row in dt.Rows)
{
StringBuilder rdata = new StringBuilder();
StringBuilder tdata = new StringBuilder();
if (head == false)
{
rdata.Append(header);
}
head = true;
foreach (object item in row.ItemArray)
{
if (item is System.DBNull)
{
tdata.Append(FileDelimiter + qoute + qoute);
}
else
{
String columnData = item.ToString();
tdata.Append(FileDelimiter);
tdata.Append(qoute + columnData + qoute);
}
}
tdata.Remove(0, 1);
tdata.Append(RS);
rdata.Append(tdata.ToString());
tdata = null;
sw.Write(rdata);
rdata = null;
}
}
}
conn.Clone();
}
}
return true;
}
catch (Exception e)
{
Dts.Events.FireError(0, "PostGres FileGenerator Error", e.ToString(), string.Empty, 0);
return false;
}
}
}
}
Subscribe to:
Posts (Atom)