Ok we extract data from postgres but what happen when those tables have millions of records and you still need to used a datatable for it,
well the solution create two queries the first query get the limit 50000 records then extract the maxid from the datatable set your flag that indicate is the second run and loop 50k at the time until the extract count is less than the limit.... Here it is....:-)
/*
Microsoft SQL Server Integration Services Script Task
Write scripts using Microsoft Visual C# 2008.
The ScriptMain is the entry point class of the script.
*/
using System;
using Npgsql;
using System.Data;
using System.IO;
using System.Text;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
namespace ST_b7b6f2660bba404bb3315fc71b301f0e.csproj
{
[System.AddIn.AddIn("ScriptMain", Version = "1.0", Publisher = "", Description = "")]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
#region VSTA generated code
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
/*
The execution engine calls this method when the task executes.
To access the object model, use the Dts property. Connections, variables, events,
and logging features are available as members of the Dts property as shown in the following examples.
To reference a variable, call Dts.Variables["MyCaseSensitiveVariableName"].Value;
To post a log entry, call Dts.Log("This is my log text", 999, null);
To fire an event, call Dts.Events.FireInformation(99, "test", "hit the help message", "", 0, true);
To use the connections collection use something like the following:
ConnectionManager cm = Dts.Connections.Add("OLEDB");
cm.ConnectionString = "Data Source=localhost;Initial Catalog=AdventureWorks;Provider=SQLNCLI10;Integrated Security=SSPI;Auto Translate=False;";
Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
To open Help, press F1.
*/
Variables vars = null;
public void Main()
{
//SSIS Variables
Dts.VariableDispenser.LockForRead("User::strBatchId");
Dts.VariableDispenser.LockForRead("User::strfTableName");
Dts.VariableDispenser.LockForRead("User::intPromoBatchId");
Dts.VariableDispenser.LockForRead("User::strSqlCmd");
Dts.VariableDispenser.LockForRead("User::strFileDirectoryPath");
Dts.VariableDispenser.LockForRead("User::strPostgresCon5");
Dts.VariableDispenser.LockForRead("User::strImportDirectory5");
Dts.VariableDispenser.LockForRead("User::intExtractCount");
Dts.VariableDispenser.LockForRead("User::bolFirstBatch");
Dts.VariableDispenser.LockForRead("User::strSqlCmdSub");
Dts.VariableDispenser.GetVariables(ref vars);
int promoBatchId = (int)vars["User::intPromoBatchId"].Value;
Boolean FirstBatch = (Boolean)vars["User::bolFirstBatch"].Value;
String FileName = vars["User::strfTableName"].Value.ToString() + "_" + vars["User::strBatchId"].Value.ToString() + "_";
FileName = FileName + promoBatchId.ToString() + ".txt";
String Delimiter = "|";
int PromoBatchId = promoBatchId;
String fullFilePath = vars["User::strImportDirectory5"].Value.ToString() + FileName;
Boolean isSuccess;
String sqlcmd = vars["User::strSqlCmd"].Value.ToString();
String ConnStr = vars["User::strPostgresCon5"].Value.ToString();
isSuccess = Generate_PostGres_File(ConnStr, sqlcmd, fullFilePath, PromoBatchId, Delimiter);
// TODO: Add your code here
vars.Unlock();
Dts.TaskResult = (int)ScriptResults.Success;
}
/*
* Get Header File
*/
private String GetFileHeader(ref DataTable dt, String FileDelimiter)
{
StringBuilder HeaderFile = new StringBuilder();
String qoute = "";
String RS = System.Environment.NewLine;
foreach (DataColumn col in dt.Columns)
{
HeaderFile.Append(FileDelimiter);
HeaderFile.Append(qoute + col.ColumnName + qoute);
}
HeaderFile.Remove(0, 1);
HeaderFile.Append(RS);
return HeaderFile.ToString();
}
private Boolean Generate_PostGres_File(String ConnectionString, String SqlCmd, String FullFilePath, int PromoBatchId, String FileDelimiter)
{
String tablename = vars["User::strfTableName"].Value.ToString();
String SubQuery = vars["User::strSqlCmdSub"].Value.ToString();
Boolean FirstBatch = (Boolean)vars["User::bolFirstBatch"].Value;
Boolean head = false;
String qoute = "";
String RS = System.Environment.NewLine;
Int64 PromotionId = 0;
int ExtractCount = (int) vars["User::intExtractCount"].Value;
int RowsCount = ExtractCount;
try
{
using (StreamWriter sw = File.CreateText(FullFilePath))
{
while ( RowsCount >= ExtractCount)
{
using (NpgsqlConnection conn = new NpgsqlConnection(ConnectionString))
{
conn.Open();
using (NpgsqlCommand cmd = new NpgsqlCommand(SqlCmd, conn))
{
if (!FirstBatch)
{
cmd.Parameters.AddWithValue("parameter", BatchId);
cmd.Parameters.AddWithValue("parameter", primarykeyId);
}
else
{
cmd.Parameters.AddWithValue("parameter", BatchId);
}
using (NpgsqlDataReader reader = cmd.ExecuteReader())
{
DataTable dt = new DataTable();
int cnt = 0;
dt.Load(reader);
if (dt != null)
{
String header = String.Empty ;
if (FirstBatch)
{
header = GetFileHeader(ref dt, FileDelimiter);
}
foreach (DataRow row in dt.Rows)
{
StringBuilder rdata = new StringBuilder();
StringBuilder tdata = new StringBuilder();
if (!head && FirstBatch)
{
rdata.Append(header);
}
head = true;
foreach (object item in row.ItemArray)
{
if (item is System.DBNull)
{
tdata.Append(FileDelimiter + qoute + qoute);
}
else
{
String columnData = item.ToString();
tdata.Append(FileDelimiter);
tdata.Append(qoute + columnData + qoute);
}
}
tdata.Remove(0, 1);
tdata.Append(RS);
rdata.Append(tdata.ToString());
tdata = null;
sw.Write(rdata);
}
}
cnt = dt.Rows.Count;
RowsCount = cnt;
PromotionId = (Int64)dt.Compute("MAX(Promotion_Id)", string.Empty);
if (FirstBatch)
{
FirstBatch = false;
SqlCmd = vars["User::strSqlCmdSub"].Value.ToString();
}
}
}
conn.Close();
}
}
}
---delete records that are simply empty (files)
FileInfo _f = new FileInfo(FullFilePath);
if (_f.Exists & _f.Length == 0)
{
_f.Delete();
}
return true;
}
catch
{
return false;
}
}
}
}
No comments:
Post a Comment