Wednesday, July 16, 2014

Multi-Threaded DataManager - Design

TaskCategory
File
TSQLQuery
StoredProc
InMemory ( .NET Classes)

TaskOperationType : Will define the operation
FileExportToTask
FileParseTask
FileGenerationTask
FileToStoredProcedureTask
EmailTask
ExecuteStoredProcedureTask
SQLBulkCopyTask


TaskCategoryMetaDataType : Meta Data Attributes for a  Task Source Type
FileExportTask : sourcefile, destFile path, etc
DatabaseOperations : storedProd Name, parameters, etc

TaskCategoryMetaData : Meta Data Attributes Values that relate a Task source Type Id
FileExportTask : sourcefile, destFile path, etc
DatabaseOperations : storedProd Name, parameters, etc

TaskInstance
TaskOperationTypeId
FileSourceTaskCategoryMetaDetaiI    ( TaskCategoryMetaDataId )
FileDestinationTaskCategoryMetaDetaiI    ( TaskCategoryMetaDataId )

--Workflow
WorkflowSteps -- Predefined chain of Tasks
ValidationStep DataExtractionStep is a combination of FileExportTask and SqlBulkCopyTask

WorkUnit 
WorkflowStepId

WorkflowStepDependency (?)

WorkFlowInstance :
list of Tasks in a Queue

WorkflowInstanceHistory

--Dao
FileDao
Read a Flat File and get Reader

SqlBulkUpload

---what can be implemented ?
1) Read CSV Files in a multi-threaded fashion and execute simultaneously do sql Bulk copy into the database.
2) As a Start implement the following FileExportTask, SQLBulkCopyTask

How can it be implemented ?

  1. WorkflowBroker will read and write from a syncQueue. 
  2. Producer Consumer Queue http://www.albahari.com/threading/part2.aspx#_AutoResetEvent 

--------------
var fundFamilies = new List<FundFamily>
                                   {
                                       new FundFamily {Id = 1, Code = "RE"},
                                       new FundFamily {Id = 2, Code = "NON-RE"},
                                       new FundFamily {Id = 3, Code = "PEQ"}
                                   };

            var funds = new List<Fund>
                            {
                                new Fund {Id = 1, Code = "MSREF1", FundFamilyId = 1},
                                new Fund {Id = 2, Code = "MSREF2", FundFamilyId = 1},
                                new Fund {Id = 3, Code = "EQ1", FundFamilyId = 2}
                            };
            var modelDatas = new List<ModelData>
                                 {
                                     new ModelData()
                                         {ModelCode = "BSP", ModelName = "Boston Seaport", FundCode = "MSREF1"},
                                     new ModelData()
                                         {ModelCode = "WBS", ModelName = "Woodbrook Sea", FundCode = "MSREF1"},
                                     new ModelData()
                                         {ModelCode = "EQTL", ModelName = "Private Tryport", FundCode = "EQ1"},
                                     new ModelData()
                                         {ModelCode = "EQTL1", ModelName = "Private Tryport2", FundCode = "EQ1"}
                                 };

            var query = from f in funds
                        join ff in fundFamilies on f.FundFamilyId equals ff.Id
                        select new {FundFamilyId = ff.Id, FundFamilyCode = ff.Code, Fundid = f.Id, FundCode = f.Code};
            //foreach (var bp in query)
            //    Console.WriteLine("Funds info  - {0}, {1}, {2}, {3}", bp.FundFamilyId, bp.FundFamilyCode, bp.Fundid, bp.FundCode) ;

            //Create ModelTransaction 
            var modelTransactions = new List<ModelTransaction>
                                        {
                                            new ModelTransaction()
                                                {ModelCode = "BSP", ModelName = "Boston Seaport", FundCode = "MSREF1"},
                                            new ModelTransaction()
                                                {ModelCode = "WBS", ModelName = "Woodbrook Sea", FundCode = "MSREF1"},
                                            new ModelTransaction()
                                                {ModelCode = "EQTL", ModelName = "Private Tryport", FundCode = "EQ1"},
                                            new ModelTransaction()
                                                {ModelCode = "EQTL1", ModelName = "Private Tryport2", FundCode = "EQ1"}
                                        };


            Console.WriteLine("--Before  Update----");
            foreach (ModelTransaction modelTransaction in modelTransactions)
                Console.WriteLine("Funds info  - {0}, {1}, {2}, {3}", modelTransaction.FundFamilyId,
                                  modelTransaction.FundCode, modelTransaction.FundId, modelTransaction.FundFamilyId);

            Console.WriteLine("--After Update----");
            var metaDataQuery = (from f in funds
                                 join ff in fundFamilies on f.FundFamilyId equals ff.Id
                                 select new {FundCode = f.Code, f.FundFamilyId, FundId = f.Id});
            //.Update(mt => { mt.FundFamilyId = 1 ; mt.FundId = 2; });
           
            //var updateModelTransactionsQuery = (
            //                                       from m in modelTransactions
            //                                       join f in funds on m.FundCode equals f.Code
            //                                       join ff in fundFamilies on f.FundFamilyId equals ff.Id
            //                                       select m);
            //foreach (var m in updateModelTransactionsQuery)
            //{

            //}
            foreach (ModelTransaction modelTransaction in modelTransactions)
            {
                int fundId= metaDataQuery.FirstOrDefault(i => i.FundCode == modelTransaction.FundCode).FundId;
                int fundFamilyId= metaDataQuery.FirstOrDefault(i => i.FundCode == modelTransaction.FundCode).FundFamilyId;
                modelTransaction.FundId = fundId;
                modelTransaction.FundFamilyId = fundFamilyId;
            }
             foreach (ModelTransaction modelTransaction in modelTransactions)
                Console.WriteLine("Funds info  - {0}, {1}, {2}, {3}", modelTransaction.FundFamilyId, modelTransaction.FundCode, modelTransaction.FundId, modelTransaction.FundFamilyId);


==============
public class WorkUnit
    {
        public int Id { get; set; }
        public string Code { get; set; }
        public string Name { get; set; }
    }

 public class WorkUnitManager    {
        private List<WorkUnit> _tasks;
        public WorkUnitManager()
        {
        }

        public bool ProcessModel(Model model)
        {
            return true;
        }
    }

 public static void ProcessWorkUnit(Object stateInfo)
        {
            WorkUnit workUnit = (WorkUnit) stateInfo;
            Thread.Sleep(1000);
            Console.WriteLine(workUnit.Name);
        }

Main ()
  var workUnits = new List<WorkUnit>();
            for (int i = 0; i < 100; i++){
                var workUnit = new WorkUnit {Id = i, Name = "work Unit " + i};
                workUnits.Add(workUnit);
            }

            // Queue the task and data.
            foreach (var workUnit in workUnits)
                ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessWorkUnit), workUnit);  
         
           Console.WriteLine("Main thread does some work, then sleeps.");

            // If you comment out the Sleep, the main thread exits before
            // the ThreadPool task has a chance to run.  ThreadPool uses
            // background threads, which do not keep the application
            // running.  (This is a simple example of a race condition.)
            Thread.Sleep(1000);

            Console.WriteLine("Main thread exits.");

====Parallel Processing
Source => Models

Processing Tasks
<= Synch Operations
Load() => Load List<Model> from a .csv file [Future this can be pushed into a Queue]
Transform() => Transform List<Model> to List<ModelTransaction> ( Synch Operations but can be executed in data parallelism)

<= Parallel Operations
Enrich() => Enrich List<ModelTransaction>
Compute() => Compute DailyPnL, MTDPnL, YTDPnL
Validate() =>
Validate List<ModelTransaction> for Enrichment
Validate List<ModelTransaction> for Computations
Persist() =>
Write modelTransactions  using SQLBulkCopy

Target => ModelTransactions

=> Implement the following using Thread.QueueWorkUserItem
=> Implement the following using Task Parallelism
=> Plot the Performance Difference I/O, CPU / Memory utilization ....

ModelRepository
Load()
Transform()

TaskRepository
ComputeTask
EnrichTask
ValidateTask
PersistTask

No comments:

Post a Comment