All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
include/N3AnaBase/utils/thread_mgr.h
00001 /*
00002   Mutltithread Manager
00003   06/2010, Hugon Christophe, CENBG
00004 
00005   thread_mgr is a manager to make a faster calculation from an ana_event file to an ana event file.
00006   To use it you need to init a 'routine' like :
00007 
00008   #include <N3AnaBase/measurements/tof_eMgN_measurement.h>
00009 
00010   void* a_thread_example(void* data_)
00011   {
00012 
00013   //getting the data from the pointer (indispensable)
00014   evt_coll the_data = (evt_coll) data_;
00015 
00016   //initializing the tof_calculator (for instance)
00017   nemo3::tof_eMgN_measurement the_tof ;
00018   the_tof.set_debug(true);
00019   the_tof.set_ECorr(true);
00020 
00021   for (int evt_it = 0 ; evt_it < the_data->size () ; evt_it++)
00022   {
00023   the_tof.do_measurement(*(the_data->at(evt_it)));
00024   }
00025 
00026   return NULL;
00027   }
00028 
00029   then add it in the thread_mgr object in the place of 'routine_', with the files address and the number of thread you wish.
00030   then, launch it with Process().
00031   If you need more argument, you have to use global variables before the thread_mgr's #include.
00032 */
00033 
00034 #ifndef __N3AnaBase__thread_mgr
00035 #define __N3AnaBase__thread_mgr 1
00036 
00037 #define app_name "thread manager: "
00038 
00039 #include <ctime>
00040 #include <pthread.h>
00041 #include <string>
00042 #include <list>
00043 #include <vector>
00044 
00045 using namespace std;
00046 
00047 // Utilities:
00048 #include <datatools/utils/utils.h>
00049 #include <datatools/utils/ioutils.h>
00050 #include <NemoTools/MyProperty.h>
00051 #include <datatools/utils/properties.h>
00052 #include <N3AnaBase/utils/convert_ana.h>
00053 
00054 // N3AnaBase data model:
00055 #include <N3AnaBase/event/ana_event.h>
00056 #include <N3AnaBase/event/io.h>
00057 #include <N3IORoot/N3AnaDataReader.h>
00058 
00059 // data model:
00060 #include <N3Event/N3RawEvent.h>
00061 #include <N3Event/N3StdCalibEvent.h>
00062 #include <N3Event/N3StdTrackedEvent.h>
00063 #include <N3Arc4se/ConstActivityModel.hh>
00064 
00065 // I/O:
00066 #include <N3IORoot/N3RawDataReader.h>
00067 #include <N3IORoot/N3AnaDataWriter.h>
00068 
00069 // apply real running conditions:
00070 #include <N3Arc4se/RunType.hh>
00071 #include <N3Arc4se/Arc4seMgr.hh>
00072 
00073 // calibration:
00074 #include <N3Calib/N3StdCalibMgr.h>
00075 
00076 // database:
00077 #include <N3DbMgr.h>
00078 
00079 // tracking:
00080 #include <N3TrackLAL/N3LALTracker.h>
00081 
00082 #include <N3AnaBase/event/event_utils.h>
00083 #include <N3AnaBase/event/sim_event_utils.h>
00084 
00085 
00086 // cuts' factory:
00087 #include <N3AnaBase/cuts/cut_factory.h>
00088 
00089 using namespace datatools::utils;
00090 
00091 
00092 namespace nemo3 {
00093 
00094 
00095   pthread_mutex_t calib_mutex = PTHREAD_MUTEX_INITIALIZER;//calibration mutex
00096   pthread_mutex_t simu_mutex = PTHREAD_MUTEX_INITIALIZER;//Process raw mutex
00097   pthread_mutex_t select_mutex = PTHREAD_MUTEX_INITIALIZER;//selecte settled evt selection
00098   pthread_mutex_t precal_mutex = PTHREAD_MUTEX_INITIALIZER;
00099   pthread_mutex_t postcal_mutex = PTHREAD_MUTEX_INITIALIZER;
00100   pthread_mutex_t convert_mutex = PTHREAD_MUTEX_INITIALIZER;
00101   pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
00102 
00103 
00104   typedef list<nemo3::ana_event*>* evt_coll; //it is for the user :p
00105   typedef list<nemo3::ana_event*>::iterator evt_coll_it; //it is for the user :p
00106 
00107 
00108 
00109   //------------------------------------------------------------------------------------routines' struct
00110   struct data_writer {
00111     list<list<nemo3::ana_event*> > event_buffer; //create a pointer list, to avoid copying
00112     pthread_mutex_t to_mutex;
00113     pthread_cond_t to_cond;
00114 
00115   };
00116 
00117   struct data_reader {
00118     list<pthread_t> sub_thread;
00119     list<list<nemo3::ana_event*> > event_buffer;
00120     pthread_mutex_t to_mutex;
00121     pthread_cond_t to_cond;
00122     long start;
00123   };
00124 
00125   struct data_convert {
00126     list<nemo3::ana_event*> ana_buffer;
00127     list<nemo3::StdTrackedEvent*> raw_buffer;
00128     pthread_mutex_t to_mutex;
00129     pthread_cond_t to_cond;
00130   };
00131 
00132   struct data_analyze {
00133   list<nemo3::ana_event*> *ana_buffer;
00134     pthread_mutex_t to_mutex;
00135     pthread_cond_t to_cond;
00136   };
00137 
00138   class thread_mgr {
00139 
00140   public :
00141     static thread_mgr* GetThread_mgr(string file_in_="", string file_out_="", int NbThread_=-1, void *(*routine_)(void *)=NULL);
00142     void Close();
00143     void SetDebug(bool debug_);
00144     void SetWrite(bool write_);//if you don't want to write
00145     void SetBufferSize(int buffer_size_);
00146     void SetPR(bool PR_);
00147     void SetStat(bool stat_);
00148     void SetSimulated(string list_file_, double activity_, int run_type_ = nemo3::RunType::BETABETA);
00149     void SetFalsifiedRunNb (int rn_) {__falsify_run_nb=rn_;}
00150 
00151     void SetSequential(bool sequen_);
00152     void SetNbEvt (unsigned int nb_);
00153     bool WaitForIt(pthread_t &thread_, pthread_mutex_t &to_mutex_, pthread_cond_t &to_cond_, long start_);
00154     void Lock(std::string);
00155     void Unlock(std::string);
00156 
00157   private :
00158     bool InitArc4se(long seed_);
00159     void __Lock(std::string);
00160     void __Unlock(std::string);
00161     void __CleanOutLock();
00162     void __CleanIt (list<list<nemo3::ana_event*> > &event_buffer);
00163 
00164   public :
00165     void SetAcceptOnlyGoodRunStatus (bool);
00166     void SetApplyReal (bool);
00167     void SetDegradPMTs(bool);
00168     void SetDeleteOffPMTs(bool);
00169     void SetDeleteHotPMTs(bool);
00170     void SetDegradGGs(bool);
00171     void SetDeleteOffGGs(bool);
00172     void SetDeleteHotGGs(bool);
00173     void SetRunType(int rt_){__run_type=rt_;}
00174 
00175   public :
00176     void SetReal();
00177     void SetFileIn(string file_in_);
00178     void SetFileOut(string file_out_);
00179     void SetRoutine(void *(*routine_)(void *));
00180     bool Process(string file_in_="", string file_out_="", int NbThread_ = -1, void *(*routine_)(void *) = NULL);
00181     bool SelectEvent(nemo3::ana_event *event_, string cut_file_, string cut_name_);
00182     void KillEvt(evt_coll the_coll, list<nemo3::ana_event*>::iterator evt_it);
00183 
00184   private:
00185     static void* writer_routine(void* data_);
00186     static void* convert_routine(void* data_);
00187     static void* reader_routine(void* data_);
00188 
00189 
00190 
00191   private :
00192     thread_mgr(string file_in_="", string file_out_="", int NbThread_ = 1, void *(*routine_)(void *) = NULL);
00193     ~thread_mgr();
00194     thread_mgr& operator= (const thread_mgr&);
00195 
00196     typedef struct {
00197       nemo3::cut_factory *cut_factory;
00198       nemocuts::cut_manager *cut_manager;
00199       nemocuts::ICut *selector;
00200     } cut_struct;
00201 
00202   private :
00203     static thread_mgr* Instance;
00204 
00205     static bool __debug;
00206     static bool __write;
00207     static bool __PR;//for process raw
00208     static short __format;
00209     static int __buffer_size;
00210     static int __NbThread;
00211     static int __counter;
00212     static int  __falsify_run_nb;
00213     static unsigned int __evt_max;
00214     static nemo3::Arc4seMgr *__arcase_mgr;
00215     static nemo3::StdCalibMgr *__calib_mgr;
00216     static nemo3::LALTracker *__tracker_mgr;
00217     static nemo3::cut_factory *__cut_factory;
00218     static nemocuts::cut_manager *__cut_manager;
00219     static nemocuts::ICut *__selector;
00220     static std::map<string,cut_struct> __the_cutter;
00221     static nemo3::io::store_mgr *__writer;
00222     static void *__reader;
00223     static vector <nemocuts::ICut> __selectors;
00224 
00225     static bool __simulated;
00226     bool __select;
00227     bool __sequen;
00228     bool __stat;
00229     bool __DegradPMTs;
00230     bool __DeleteOffPMTs;
00231     bool __DeleteHotPMTs;
00232     bool __DegradGGs;
00233     bool __DeleteOffGGs;
00234     bool __DeleteHotGGs;
00235     bool __AcceptOnlyGood;
00236     int __run_type;
00237     double __activity;
00238     string __file_in;
00239     string __file_out;
00240     string __list_file;
00241     string __cut_file;
00242     string __cut_name;
00243     void *(*__routine)(void*);
00244     struct timeval __now;
00245     struct timespec __wait;
00246     long __waiting[3];
00247 
00248     std::map <std::string,pthread_mutex_t> __mutex_lst;
00249     pthread_mutex_t __map_mutex;
00250 
00251   };
00252 }//end of namespace nemo3
00253 
00254 
00255 #endif