TaskCategory
File
TSQLQuery
StoredProc
InMemory ( .NET Classes)
TaskOperationType : Will define the operation
FileExportToTask
FileParseTask
FileGenerationTask
FileToStoredProcedureTask
EmailTask
ExecuteStoredProcedureTask
SQLBulkCopyTask
TaskCategoryMetaDataType : Meta Data Attributes for a Task Source Type
FileExportTask : sourcefile, destFile path, etc
DatabaseOperations : storedProd Name, parameters, etc
TaskCategoryMetaData : Meta Data Attributes Values that relate a Task source Type Id
FileExportTask : sourcefile, destFile path, etc
DatabaseOperations : storedProd Name, parameters, etc
TaskInstance
TaskOperationTypeId
FileSourceTaskCategoryMetaDetaiI ( TaskCategoryMetaDataId )
FileDestinationTaskCategoryMetaDetaiI ( TaskCategoryMetaDataId )
--Workflow
WorkflowSteps -- Predefined chain of Tasks
ValidationStep DataExtractionStep is a combination of FileExportTask and SqlBulkCopyTask
WorkUnit
WorkflowStepId
WorkflowStepDependency (?)
WorkFlowInstance :
list of Tasks in a Queue
WorkflowInstanceHistory
--Dao
FileDao
Read a Flat File and get Reader
SqlBulkUpload
---what can be implemented ?
1) Read CSV Files in a multi-threaded fashion and execute simultaneously do sql Bulk copy into the database.
2) As a Start implement the following FileExportTask, SQLBulkCopyTask
How can it be implemented ?
public static void ProcessWorkUnit(Object stateInfo)
{
WorkUnit workUnit = (WorkUnit) stateInfo;
Thread.Sleep(1000);
Console.WriteLine(workUnit.Name);
}
Main ()
var workUnits = new List<WorkUnit>();
for (int i = 0; i < 100; i++){
var workUnit = new WorkUnit {Id = i, Name = "work Unit " + i};
workUnits.Add(workUnit);
}
// Queue the task and data.
foreach (var workUnit in workUnits)
ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessWorkUnit), workUnit);
Console.WriteLine("Main thread does some work, then sleeps.");
// If you comment out the Sleep, the main thread exits before
// the ThreadPool task has a chance to run. ThreadPool uses
// background threads, which do not keep the application
// running. (This is a simple example of a race condition.)
Thread.Sleep(1000);
Console.WriteLine("Main thread exits.");
====Parallel Processing
Source => Models
Processing Tasks
<= Synch Operations
Load() => Load List<Model> from a .csv file [Future this can be pushed into a Queue]
Transform() => Transform List<Model> to List<ModelTransaction> ( Synch Operations but can be executed in data parallelism)
<= Parallel Operations
Enrich() => Enrich List<ModelTransaction>
Compute() => Compute DailyPnL, MTDPnL, YTDPnL
Validate() =>
Validate List<ModelTransaction> for Enrichment
Validate List<ModelTransaction> for Computations
Persist() =>
Write modelTransactions using SQLBulkCopy
Target => ModelTransactions
=> Implement the following using Thread.QueueWorkUserItem
=> Implement the following using Task Parallelism
=> Plot the Performance Difference I/O, CPU / Memory utilization ....
ModelRepository
Load()
Transform()
TaskRepository
ComputeTask
EnrichTask
ValidateTask
PersistTask
File
TSQLQuery
StoredProc
InMemory ( .NET Classes)
TaskOperationType : Will define the operation
FileExportToTask
FileParseTask
FileGenerationTask
FileToStoredProcedureTask
EmailTask
ExecuteStoredProcedureTask
SQLBulkCopyTask
TaskCategoryMetaDataType : Meta Data Attributes for a Task Source Type
FileExportTask : sourcefile, destFile path, etc
DatabaseOperations : storedProd Name, parameters, etc
TaskCategoryMetaData : Meta Data Attributes Values that relate a Task source Type Id
FileExportTask : sourcefile, destFile path, etc
DatabaseOperations : storedProd Name, parameters, etc
TaskOperationTypeId
FileSourceTaskCategoryMetaDetaiI ( TaskCategoryMetaDataId )
FileDestinationTaskCategoryMetaDetaiI ( TaskCategoryMetaDataId )
--Workflow
WorkflowSteps -- Predefined chain of Tasks
ValidationStep DataExtractionStep is a combination of FileExportTask and SqlBulkCopyTask
WorkUnit
WorkflowStepId
WorkflowStepDependency (?)
WorkFlowInstance :
list of Tasks in a Queue
WorkflowInstanceHistory
--Dao
FileDao
Read a Flat File and get Reader
SqlBulkUpload
---what can be implemented ?
1) Read CSV Files in a multi-threaded fashion and execute simultaneously do sql Bulk copy into the database.
2) As a Start implement the following FileExportTask, SQLBulkCopyTask
How can it be implemented ?
- WorkflowBroker will read and write from a syncQueue.
- Producer Consumer Queue http://www.albahari.com/threading/part2.aspx#_AutoResetEvent
--------------
var fundFamilies = new List<FundFamily>
{
new FundFamily {Id = 1, Code = "RE"},
new FundFamily {Id = 2, Code = "NON-RE"},
new FundFamily {Id = 3, Code = "PEQ"}
};
var funds = new List<Fund>
{
new Fund {Id = 1, Code = "MSREF1", FundFamilyId = 1},
new Fund {Id = 2, Code = "MSREF2", FundFamilyId = 1},
new Fund {Id = 3, Code = "EQ1", FundFamilyId = 2}
};
var modelDatas = new List<ModelData>
{
new ModelData()
{ModelCode = "BSP", ModelName = "Boston Seaport", FundCode = "MSREF1"},
new ModelData()
{ModelCode = "WBS", ModelName = "Woodbrook Sea", FundCode = "MSREF1"},
new ModelData()
{ModelCode = "EQTL", ModelName = "Private Tryport", FundCode = "EQ1"},
new ModelData()
{ModelCode = "EQTL1", ModelName = "Private Tryport2", FundCode = "EQ1"}
};
var query = from f in funds
join ff in fundFamilies on f.FundFamilyId equals ff.Id
select new {FundFamilyId = ff.Id, FundFamilyCode = ff.Code, Fundid = f.Id, FundCode = f.Code};
//foreach (var bp in query)
// Console.WriteLine("Funds info - {0}, {1}, {2}, {3}", bp.FundFamilyId, bp.FundFamilyCode, bp.Fundid, bp.FundCode) ;
//Create ModelTransaction
var modelTransactions = new List<ModelTransaction>
{
new ModelTransaction()
{ModelCode = "BSP", ModelName = "Boston Seaport", FundCode = "MSREF1"},
new ModelTransaction()
{ModelCode = "WBS", ModelName = "Woodbrook Sea", FundCode = "MSREF1"},
new ModelTransaction()
{ModelCode = "EQTL", ModelName = "Private Tryport", FundCode = "EQ1"},
new ModelTransaction()
{ModelCode = "EQTL1", ModelName = "Private Tryport2", FundCode = "EQ1"}
};
Console.WriteLine("--Before Update----");
foreach (ModelTransaction modelTransaction in modelTransactions)
Console.WriteLine("Funds info - {0}, {1}, {2}, {3}", modelTransaction.FundFamilyId,
modelTransaction.FundCode, modelTransaction.FundId, modelTransaction.FundFamilyId);
Console.WriteLine("--After Update----");
var metaDataQuery = (from f in funds
join ff in fundFamilies on f.FundFamilyId equals ff.Id
select new {FundCode = f.Code, f.FundFamilyId, FundId = f.Id});
//.Update(mt => { mt.FundFamilyId = 1 ; mt.FundId = 2; });
//var updateModelTransactionsQuery = (
// from m in modelTransactions
// join f in funds on m.FundCode equals f.Code
// join ff in fundFamilies on f.FundFamilyId equals ff.Id
// select m);
//foreach (var m in updateModelTransactionsQuery)
//{
//}
foreach (ModelTransaction modelTransaction in modelTransactions)
{
int fundId= metaDataQuery.FirstOrDefault(i => i.FundCode == modelTransaction.FundCode).FundId;
int fundFamilyId= metaDataQuery.FirstOrDefault(i => i.FundCode == modelTransaction.FundCode).FundFamilyId;
modelTransaction.FundId = fundId;
modelTransaction.FundFamilyId = fundFamilyId;
}
foreach (ModelTransaction modelTransaction in modelTransactions)
Console.WriteLine("Funds info - {0}, {1}, {2}, {3}", modelTransaction.FundFamilyId, modelTransaction.FundCode, modelTransaction.FundId, modelTransaction.FundFamilyId);
==============
public class WorkUnit
{
public int Id { get; set; }
public string Code { get; set; }
public string Name { get; set; }
}
public class WorkUnitManager {
private List<WorkUnit> _tasks;
public WorkUnitManager()
{
}
public bool ProcessModel(Model model)
{
return true;
}
}
==============
public class WorkUnit
{
public int Id { get; set; }
public string Code { get; set; }
public string Name { get; set; }
}
public class WorkUnitManager {
private List<WorkUnit> _tasks;
public WorkUnitManager()
{
}
public bool ProcessModel(Model model)
{
return true;
}
}
public static void ProcessWorkUnit(Object stateInfo)
{
WorkUnit workUnit = (WorkUnit) stateInfo;
Thread.Sleep(1000);
Console.WriteLine(workUnit.Name);
}
Main ()
var workUnits = new List<WorkUnit>();
for (int i = 0; i < 100; i++){
var workUnit = new WorkUnit {Id = i, Name = "work Unit " + i};
workUnits.Add(workUnit);
}
// Queue the task and data.
foreach (var workUnit in workUnits)
ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessWorkUnit), workUnit);
Console.WriteLine("Main thread does some work, then sleeps.");
// If you comment out the Sleep, the main thread exits before
// the ThreadPool task has a chance to run. ThreadPool uses
// background threads, which do not keep the application
// running. (This is a simple example of a race condition.)
Thread.Sleep(1000);
Console.WriteLine("Main thread exits.");
====Parallel Processing
Source => Models
Processing Tasks
<= Synch Operations
Load() => Load List<Model> from a .csv file [Future this can be pushed into a Queue]
Transform() => Transform List<Model> to List<ModelTransaction> ( Synch Operations but can be executed in data parallelism)
<= Parallel Operations
Enrich() => Enrich List<ModelTransaction>
Compute() => Compute DailyPnL, MTDPnL, YTDPnL
Validate() =>
Validate List<ModelTransaction> for Enrichment
Validate List<ModelTransaction> for Computations
Persist() =>
Write modelTransactions using SQLBulkCopy
Target => ModelTransactions
=> Implement the following using Thread.QueueWorkUserItem
=> Implement the following using Task Parallelism
=> Plot the Performance Difference I/O, CPU / Memory utilization ....
ModelRepository
Load()
Transform()
TaskRepository
ComputeTask
EnrichTask
ValidateTask
PersistTask
No comments:
Post a Comment