SHORE API
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
parallel.hpp
Go to the documentation of this file.
1 
2 /*
3  * Copyright 2008,2009,2010,2011,2012 Stephan Ossowski, Korbinian Schneeberger,
4  * Felix Ott, Joerg Hagmann, Alf Scotland, Sebastian Bender
5  *
6  * This file is part of SHORE.
7  *
8  * SHORE is free software: you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation, either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * SHORE is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with SHORE. If not, see <http://www.gnu.org/licenses/>.
20  */
21 
25 
26 #ifndef SHORE_PARALLEL_PARALLEL_HPP__
27 #define SHORE_PARALLEL_PARALLEL_HPP__
28 
29 #include <iostream>
30 #include <deque>
31 #include <stdexcept>
32 #include <map>
33 #include <vector>
34 
35 #include <boost/thread/thread.hpp>
36 #include <boost/thread/recursive_mutex.hpp>
37 #include <boost/thread/condition.hpp>
38 
39 #ifdef WITH_MPI
40  #include <boost/serialization/deque.hpp>
41 #endif // WITH_MPI
42 
43 
50 #include "shore/base/memops.hpp"
51 
52 namespace shore {
53 
54 class parallelization_core_access;
55 
56 class parallel_connection_init
57 {
58  public:
59 
60  virtual ~parallel_connection_init() {}
61  virtual void init()=0;
62 };
63 
64 class parallelization_base
65 {
66  protected:
67 
68  virtual void parallelization_setup()=0;
69 
70  int m_parallelization_numthreads;
71 
72  private:
73 
74  friend class parallelization_core_access;
75 
76  std::vector<parallel_connection_init *> m_pci;
77 
78  shore::signal<int> m_signumthreads;
79 
80  shore::slot<int> m_slotnumthreads;
81 
82 
83  void slotfun_nthreads(const int & nthreads)
84  {
85  if(shore::mpi::comm_size()>1)
86  m_parallelization_numthreads=std::max(1,nthreads);
87  else
88  m_parallelization_numthreads=std::max(0,nthreads);
89 
90  parallelization_setup();
91  m_signumthreads.emit(m_parallelization_numthreads);
92 
93  for(size_t i=0;i<m_pci.size();++i)
94  m_pci[i]->init();
95  }
96 
97  parallelization_base(const parallelization_base &);
98  parallelization_base & operator=(const parallelization_base &);
99 
100  public:
101 
102  parallelization_base()
103  :m_slotnumthreads(boost::bind(&parallelization_base::slotfun_nthreads,this,_1))
104  {}
105 
106  virtual ~parallelization_base()
107  {
108  while(!m_pci.empty())
109  {
110  delete m_pci.back();
111  m_pci.pop_back();
112  }
113  }
114 
115  int num_threads() const
116  {
117  return m_parallelization_numthreads;
118  }
119 };
120 
122 // wrapped object outside the root process.
123 template<typename T>
124 class serial
125 :public parallelization_base
126 {
127  private:
128 
129  friend class parallelization_core_access;
130 
131  typedef T object_type;
132 
133  class lock_link
134  {
135  private:
136 
137  shore::slot<int> m_slotpostlock;
138  shore::slot<int> m_slotpreunlock;
139  shore::slot<int> m_slotpostunlock;
140 
141  shore::slot<void> m_slotflushthread;
142 
143  shore::signal<int> m_sigpostlock;
144  shore::signal<int> m_sigpreunlock;
145  shore::signal<int> m_sigpostunlock;
146 
147  shore::signal<void> m_sigflushthread;
148 
149  public:
150 
151  lock_link()
152  :m_slotpostlock(boost::ref(m_sigpostlock.emit)),
153  m_slotpreunlock(boost::ref(m_sigpreunlock.emit)),
154  m_slotpostunlock(boost::ref(m_sigpostunlock.emit)),
155  m_slotflushthread(boost::ref(m_sigflushthread.emit))
156  {}
157 
158  shore::slot<int> &slotpostlock()
159  {
160  return m_slotpostlock;
161  }
162 
163  shore::slot<int> &slotpreunlock()
164  {
165  return m_slotpreunlock;
166  }
167 
168  shore::slot<int> &slotpostunlock()
169  {
170  return m_slotpostunlock;
171  }
172 
173  shore::slot<void> &slotflushthread()
174  {
175  return m_slotflushthread;
176  }
177 
178  shore::signal<int> &sigpostlock()
179  {
180  return m_sigpostlock;
181  }
182 
183  shore::signal<int> &sigpreunlock()
184  {
185  return m_sigpreunlock;
186  }
187 
188  shore::signal<int> &sigpostunlock()
189  {
190  return m_sigpostunlock;
191  }
192 
193  shore::signal<void> &sigflushthread()
194  {
195  return m_sigflushthread;
196  }
197  };
198 
199  object_type *m_object;
200  std::vector<lock_link *> m_locklinks;
201 
202  shore::slot<void> m_slotjoin;
203  shore::signal<void> m_sigjoin;
204 
205 
206  serial(const serial &);
207  serial &operator=(const serial &);
208 
209  protected:
210 
211  virtual void parallelization_setup()
212  {
213  while(!m_locklinks.empty())
214  {
215  delete m_locklinks.back();
216  m_locklinks.pop_back();
217  }
218 
219  for(int i=0;i<num_threads();++i)
220  m_locklinks.push_back(new lock_link);
221  }
222 
223  public:
224 
229  :m_slotjoin(boost::ref(m_sigjoin.emit))
230  {
231  if(shore::mpi::comm_isroot())
232  m_object=new object_type;
233  else
234  m_object=0;
235  }
236 
237  template<typename A>
238  serial(const A &arg)
239  :m_slotjoin(boost::ref(m_sigjoin.emit))
240  {
241  if(shore::mpi::comm_isroot())
242  m_object=new object_type(arg);
243  else
244  m_object=0;
245  }
246 
247  template<typename A1,typename A2>
248  serial(const A1 & arg1,const A2 & arg2)
249  :m_slotjoin(boost::ref(m_sigjoin.emit))
250  {
251  if(shore::mpi::comm_isroot())
252  m_object=new object_type(arg1,arg2);
253  else
254  m_object=0;
255  }
256 
257  template<typename A1,typename A2,typename A3>
258  serial(const A1 & arg1,const A2 & arg2,const A3 & arg3)
259  :m_slotjoin(boost::ref(m_sigjoin.emit))
260  {
261  if(shore::mpi::comm_isroot())
262  m_object=new object_type(arg1,arg2,arg3);
263  else
264  m_object=0;
265  }
266 
267  ~serial()
268  {
269  while(!m_locklinks.empty())
270  {
271  delete m_locklinks.back();
272  m_locklinks.pop_back();
273  }
274 
275  delete m_object;
276  }
277 
278  template<typename R>
279  R call(R (object_type::*func)())
280  {
281  if(m_object)
282  return (m_object->*func)();
283  return R();
284  }
285 
286  template<typename R,typename A>
287  R call(R (object_type::*func)(const A &),const A &arg)
288  {
289  if(m_object)
290  return (m_object->*func)(arg);
291  return R();
292  }
293 
294  template<typename R,typename A>
295  R call(R (object_type::*func)(const A),const A &arg)
296  {
297  if(m_object)
298  return (m_object->*func)(arg);
299  return R();
300  }
301 };
302 
304 template<typename T>
305 class parallel
306 :public parallelization_base
307 {
308  private:
309 
310  friend class parallelization_core_access;
311 
312  typedef T object_type;
313 
314 
315  struct factory_base
316  {
317  virtual ~factory_base() {}
318  virtual object_type *create()=0;
319  };
320 
321  template<typename Obj>
322  struct arg0_factory
323  :public factory_base
324  {
325  virtual Obj *create()
326  {
327  return new Obj;
328  }
329  };
330 
331  template<typename Obj,typename Arg>
332  struct arg1_factory
333  :public factory_base
334  {
335  Arg arg;
336 
337  arg1_factory(const Arg & a)
338  :arg(a)
339  {}
340 
341  virtual Obj *create()
342  {
343  return new Obj(arg);
344  }
345  };
346 
347 
348  std::vector<object_type *> m_objects;
349  factory_base * m_factory;
350 
351  shore::slot<void> m_slotjoin;
352  shore::signal<void> m_sigjoin;
353 
354  protected:
355 
356  virtual void parallelization_setup()
357  {
358  while(!m_objects.empty())
359  {
360  delete m_objects.back();
361  m_objects.pop_back();
362  }
363 
364  m_objects.push_back(m_factory->create());
365  for(int i=1;i<num_threads();++i)
366  m_objects.push_back(m_factory->create());
367  }
368 
369 
370  public:
371 
372  parallel()
373  :m_slotjoin(boost::ref(m_sigjoin.emit))
374  {
375  m_factory=new arg0_factory<object_type>;
376  }
377 
378  template<typename A>
379  parallel(const A &arg)
380  :m_slotjoin(boost::ref(m_sigjoin.emit))
381  {
382  m_factory=new arg1_factory<object_type,A>(arg);
383  }
384 
385  ~parallel()
386  {
387  while(!m_objects.empty())
388  {
389  delete m_objects.back();
390  m_objects.pop_back();
391  }
392  delete m_factory;
393  }
394 };
395 
399 template<typename T>
400 class sync
401 :public parallelization_base
402 {
403  public:
404 
405  typedef T append_type;
406  typedef T current_type;
407 
408  private:
409 
410  friend class parallelization_core_access;
411 
412  typedef boost::recursive_mutex mutex_type;
413  typedef boost::recursive_mutex::scoped_lock lock_type;
414  typedef boost::condition condition_type;
415 
416  class track;
417 
418 
419  mutex_type m_mutex;
420  condition_type m_flush_condition;
421 
422  shore::slot<void> m_slotjoin;
423  shore::signal<void> m_sigjoin;
424 
425  std::set<int> m_flushed_threads;
426  std::set<int> m_flushed_processes;
427  shore::signal<void> m_sigsyncflush;
428 
429  std::vector<track *> m_thread_tracks;
430  std::vector<track *> m_proxy_tracks;
431 
432  void slotfun_join()
433  {
434  #ifdef WITH_MPI
435  if(shore::mpi::comm_isroot())
436  {
437  lock_type lock(m_mutex);
438 
439  while(m_flushed_processes.size()<(shore::mpi::comm_size()-1))
440  m_flush_condition.wait(lock);
441 
442  m_data_submitter.join();
443  m_flush_submitter.join();
444  }
445  #endif // WITH_MPI
446  m_sigjoin.emit();
447  }
448 
449  #ifdef WITH_MPI
450  std::vector<shore::feed<append_type> > m_proxy_feeds;
451 
452  mpi_proxy::submitter<append_type> m_data_submitter;
453  shore::slot<const append_type &> m_slotproxy_data;
454 
455  mpi_proxy::submitter<int> m_flush_submitter;
456  shore::slot<const int &> m_slotproxy_flush;
457 
458  void slotfun_proxyappend(const append_type &d)
459  {
460  m_proxy_feeds[m_data_submitter.get_source()-1].append(d);
461  }
462 
463  void slotfun_proxyflush(const int &)
464  {
465  m_proxy_feeds[m_flush_submitter.get_source()-1].flush();
466  m_flush_condition.notify_all();
467  }
468  #endif // WITH_MPI
469 
470 
471  sync(const sync &);
472  sync &operator=(const sync &);
473 
474 
478  class track
479  :public shore::pipe_facade<track,append_type,current_type,true>
480  {
481  private:
482 
483  friend class shore::pipeline_core_access;
484  friend class parallelization_core_access;
485 
486  sync &m_sync;
487  lock_type *m_lock;
488 
489  int m_proxied_process;
490 
491  shore::signal<int> m_sigpostlock;
492  shore::signal<int> m_sigpreunlock;
493  shore::signal<int> m_sigpostunlock;
494 
495  track(const track &);
496  track &operator=(const track &);
497 
498 
499  void acquire_lock()
500  {
501  m_lock=new lock_type(m_sync.m_mutex);
502  m_sigpostlock.emit(m_proxied_process);
503  }
504 
505  void release_lock()
506  {
507  m_sigpreunlock.emit(m_proxied_process);
508 
509  delete m_lock;
510  m_lock=0;
511 
512  m_sigpostunlock.emit(m_proxied_process);
513  }
514 
515  //void next() {}
516 
517  //void prepare(const append_type &d) {}
518 
519  void append(const append_type &d)
520  {
521  acquire_lock();
522 
523  try
524  {
525 
526  #ifdef WITH_MPI
527  if(!shore::mpi::comm_isroot())
528  m_sync.m_data_submitter.submit(d);
529  #endif // WITH_MPI
530 
531  this->emit(d);
532  }
533  catch(...)
534  {
535  delete m_lock;
536  m_lock=0;
537  throw;
538  }
539 
540  release_lock();
541  }
542 
543  void flush()
544  {
545  if(m_sync.num_threads()==0)
546  return;
547 
548  // lock, check if all threads are flushed;
549  // if so, flush the sync'ed range using this thread
550  acquire_lock();
551 
552  try
553  {
554  if(m_proxied_process==0)
555  m_sync.m_flushed_threads.insert(thread_index::id());
556  #ifdef WITH_MPI
557  else
558  m_sync.m_flushed_processes.insert(m_proxied_process);
559  #endif // WITH_MPI
560 
561  const int unflushed_threads=
562  m_sync.num_threads()-int(m_sync.m_flushed_threads.size());
563 
564  int unflushed_processes=0;
565 
566  #ifdef WITH_MPI
567  // Also need to have the other MPI processes flushed
568  unflushed_processes=
569  shore::mpi::comm_size()-int(m_sync.m_flushed_processes.size())-1;
570  #endif // WITH_MPI
571 
572  if(shore::mpi::comm_isroot())
573  {
574  if((unflushed_threads==0)&&(unflushed_processes==0))
575  m_sync.m_sigsyncflush.emit();
576  }
577  #ifdef WITH_MPI
578  else if(unflushed_threads==0)
579  m_sync.m_flush_submitter.submit(0);
580  #endif // WITH_MPI
581 
582  }
583  catch(...)
584  {
585  delete m_lock;
586  m_lock=0;
587  throw;
588  }
589 
590  release_lock();
591 
592  // now unlocked and pipe_facade will flush this thread
593  }
594 
595 
596  public:
597 
598  track(sync &s,const int pr)
599  :m_sync(s),
600  m_lock(0),
601  m_proxied_process(pr)
602  {}
603  };
604 
605  protected:
606 
607  virtual void parallelization_setup()
608  {
609  while(!m_thread_tracks.empty())
610  {
611  delete m_thread_tracks.back();
612  m_thread_tracks.pop_back();
613  }
614 
615  m_thread_tracks.push_back(new track(*this,0));
616  for(int i=1;i<num_threads();++i)
617  m_thread_tracks.push_back(new track(*this,0));
618  }
619 
620  public:
621 
622  sync()
623  :m_slotjoin(boost::ref(m_sigjoin.emit))
624  #ifdef WITH_MPI
625  ,
626  m_slotproxy_data(boost::bind(&sync::slotfun_proxyappend,this,_1)),
627  m_slotproxy_flush(boost::bind(&sync::slotfun_proxyflush,this,_1))
628  #endif // WITH_MPI
629  {
630  #ifdef WITH_MPI
631  if(shore::mpi::comm_isroot())
632  {
633  m_data_submitter.sigdata().connect(m_slotproxy_data);
634  m_flush_submitter.sigdata().connect(m_slotproxy_flush);
635 
636  m_proxy_feeds.resize(shore::mpi::comm_size()-1);
637  for(size_t i=1;i<shore::mpi::comm_size();++i)
638  {
639  m_proxy_tracks.push_back(new track(*this,i));
640  m_proxy_feeds[i-1]|(*m_proxy_tracks.back());
641  }
642  }
643  #endif // WITH_MPI
644  }
645 
646  ~sync()
647  {
648  while(!m_thread_tracks.empty())
649  {
650  delete m_thread_tracks.back();
651  m_thread_tracks.pop_back();
652  }
653  #ifdef WITH_MPI
654  while(!m_proxy_tracks.empty())
655  {
656  delete m_proxy_tracks.back();
657  m_proxy_tracks.pop_back();
658  }
659  m_data_submitter.unsubscribe();
660  m_flush_submitter.unsubscribe();
661  #endif // WITH_MPI
662  }
663 
664  track &operator[](const size_t i)
665  {
666  return *m_thread_tracks[i];
667  }
668 };
669 
674 template<typename T>
675 class desync
676 :public parallelization_base
677 {
678  private:
679 
680  friend class parallelization_core_access;
681 
682 
683  typedef T append_type;
684  typedef T current_type;
685 
686  typedef boost::recursive_mutex mutex_type;
687  typedef boost::recursive_mutex::scoped_lock lock_type;
688 
689 
690  class lock_buffer;
691  class multiplexer;
692 
693  shore::slot<void> m_slotjoin;
694  shore::signal<void> m_sigjoin;
695 
696  mutex_type m_mutex;
697 
698  multiplexer m_multiplexer;
699 
701  std::vector<lock_buffer *> m_thread_tracks;
703  std::vector<lock_buffer *> m_process_tracks;
704 
706  std::map<boost::thread::id,int> m_bufmapping;
707 
708  #ifdef WITH_MPI
709  shore::slot<std::deque<append_type> &> m_slotproxy_getdata;
710  mpi_proxy::requester<std::deque<append_type> > m_data_requester;
711 
712  void slotfun_proxygetdata(std::deque<append_type> &d)
713  {
714  const int pr=m_data_requester.get_target();
715  lock_buffer &b=get_lockbuffer(pr);
716  d.swap(b.buffer());
717  b.buffer().clear();
718  }
719  #endif // WITH_MPI
720 
721 
722  lock_buffer &get_lockbuffer(int process)
723  {
724  lock_type lock(m_mutex);
725  if(process>0)
726  return *m_process_tracks[process-1];
727 
728  int &thr=m_bufmapping[boost::this_thread::get_id()];
729  if(thr==0)
730  thr=m_bufmapping.size();
731  return *m_thread_tracks[thr-1];
732  }
733 
734  class lock_buffer
735  :public shore::pipe_facade<lock_buffer,append_type,current_type,true>
736  {
737  private:
738 
739  friend class shore::pipeline_core_access;
740  friend class parallelization_core_access;
741  friend class multiplexer;
742 
743  std::deque<append_type> m_buffer;
744 
745  void next()
746  {
747  m_buffer.pop_front();
748 
749  if(!m_buffer.empty())
750  this->emit(m_buffer.front());
751  }
752 
753  //void prepare(const append_type &d) {}
754 
755  void append(const append_type &d)
756  {
757  m_buffer.push_back(d);
758  }
759 
760  //void flush() {}
761 
762  public:
763 
764  lock_buffer()
765  {}
766 
767  std::deque<append_type> &buffer()
768  {
769  return m_buffer;
770  }
771 
772  bool empty() const
773  {
774  return m_buffer.empty();
775  }
776 
777  const append_type &front() const
778  {
779  return m_buffer.front();
780  }
781 
782  void pop_front()
783  {
784  m_buffer.pop_front();
785  }
786 
787  void dump()
788  {
789  if(!m_buffer.empty())
790  this->emit(m_buffer.front());
791  }
792  };
793 
794 
795  class multiplexer
796  :public shore::pipe_facade<multiplexer,append_type,current_type,true>
797  {
798  private:
799 
800  friend class shore::pipeline_core_access;
801  friend class parallelization_core_access;
802 
803 
804 
805  desync &m_desync;
806 
807  shore::slot<int> m_slotpostlock;
808  shore::slot<int> m_slotpreunlock;
809  shore::slot<int> m_slotpostunlock;
810 
811  void slotfun_postlock(int pr)
812  {
813  lock_buffer &b=m_desync.get_lockbuffer(pr);
814  (*this)|b;
815 
816  #ifdef WITH_MPI
817  if(!shore::mpi::comm_isroot())
818  m_desync.m_data_requester.request(b.buffer());
819  #endif // WITH_MPI
820  }
821 
822  void slotfun_preunlock(int pr)
823  {
824  this->disconnect_outputs();
825  }
826 
827  void slotfun_postunlock(int pr)
828  {
829  // if pr!=0 i.e. the data belongs to a different process,
830  // leave it in the buffer to be retrieved remotely by
831  // the process' slotfun_postlock, and subsequently
832  // dumped there.
833  if(pr==0)
834  m_desync.get_lockbuffer(pr).dump();
835  }
836 
837  //void next() {}
838 
839  //void prepare(const append_type &d) {}
840 
841  void append(const append_type &d)
842  {
843  this->emit(d);
844  }
845 
846  //void flush() {}
847 
848  public:
849 
850  multiplexer(desync &d)
851  :m_desync(d),
852  m_slotpostlock(boost::bind(&multiplexer::slotfun_postlock,this,_1)),
853  m_slotpreunlock(boost::bind(&multiplexer::slotfun_preunlock,this,_1)),
854  m_slotpostunlock(boost::bind(&multiplexer::slotfun_postunlock,this,_1))
855  {}
856 
857  shore::slot<int> &slotpostlock()
858  {
859  return m_slotpostlock;
860  }
861 
862  shore::slot<int> &slotpreunlock()
863  {
864  return m_slotpreunlock;
865  }
866 
867  shore::slot<int> &slotpostunlock()
868  {
869  return m_slotpostunlock;
870  }
871  };
872 
873 
874  desync(const desync &);
875  desync &operator=(const desync &);
876 
877  protected:
878 
879  virtual void parallelization_setup()
880  {
881  while(!m_thread_tracks.empty())
882  {
883  delete m_thread_tracks.back();
884  m_thread_tracks.pop_back();
885  }
886 
887  for(int i=0;i<num_threads();++i)
888  m_thread_tracks.push_back(new lock_buffer);
889  }
890 
891  public:
892 
893  desync()
894  :m_slotjoin(boost::bind(&desync::slotfun_join,this)),
895  m_multiplexer(*this)
896  #ifdef WITH_MPI
897  ,
898  m_slotproxy_getdata(boost::bind(&desync::slotfun_proxygetdata,this,_1))
899  #endif // WITH_MPI
900  {
901  if(shore::mpi::comm_isroot())
902  for(size_t i=1;i<shore::mpi::comm_size();++i)
903  m_process_tracks.push_back(new lock_buffer);
904  #ifdef WITH_MPI
905  m_data_requester.siggetdata().connect(m_slotproxy_getdata);
906  #endif // WITH_MPI
907  }
908 
909  ~desync()
910  {
911  #ifdef WITH_MPI
912  m_data_requester.join();
913  m_data_requester.unsubscribe();
914  #endif // WITH_MPI
915  }
916 };
917 
919 template<typename T>
921 :public parallelization_base
922 {
923  private:
924 
925  friend class parallelization_core_access;
926 
927  typedef T append_type;
928  typedef T current_type;
929 
930  typedef boost::recursive_mutex mutex_type;
931  typedef boost::recursive_mutex::scoped_lock lock_type;
932  typedef boost::condition condition_type;
933 
934  class multiplexer
935  :public shore::pipe_facade<multiplexer,append_type,current_type,true>
936  {
937  private:
938 
939  friend class shore::pipeline_core_access;
940 
941  parallelizer &m_parallelizer;
942 
943 
944  multiplexer(const multiplexer &);
945 
946  //void next() {}
947 
948  //void prepare(const append_type &d) {}
949 
950  void append(const append_type &d)
951  {
952  if(m_parallelizer.m_nthreads==0)
953  {
954  this->emit(d);
955  }
956  else
957  {
958  m_parallelizer.errcheck();
959 
960  lock_type lock(m_parallelizer.m_mutex);
961 
962  if(m_parallelizer.data_ready())
963  m_parallelizer.m_dataretrieved_condition.wait(lock);
964 
965  m_parallelizer.m_input_buffer.push_back(d);
966 
967  if(m_parallelizer.data_ready())
968  m_parallelizer.m_dataready_condition.notify_one();
969  }
970  }
971 
972  void flush()
973  {
974  if(m_parallelizer.m_nthreads>0)
975  {
976  {
977  lock_type lock(m_parallelizer.m_mutex);
978  m_parallelizer.errcheck();
979 
980  m_parallelizer.m_batchsize=0;
981  m_parallelizer.m_dataready_condition.notify_all();
982  }
983  m_parallelizer.join();
984  }
985  }
986 
987  public:
988 
989  multiplexer(parallelizer &p)
990  :m_parallelizer(p)
991  {}
992  };
993 
994  class thread
995  :public shore::pipe_facade<thread,append_type,current_type,true>,
996  public shore::thread
997  {
998  private:
999 
1000  friend class shore::pipeline_core_access;
1001 
1002  parallelizer &m_parallelizer;
1003 
1004  std::deque<append_type> m_buffer;
1005 
1006  std::ostringstream m_errlog;
1007 
1008  void next()
1009  {
1010  if(!m_buffer.empty())
1011  m_buffer.pop_front();
1012  if(!m_buffer.empty())
1013  this->emit(m_buffer.front());
1014  }
1015 
1016  //void prepare(const append_type &d) {}
1017 
1018  void append(const append_type &d)
1019  {
1020  // serial mode: just pass through
1021  this->emit(d);
1022  }
1023 
1024  //void flush() {}
1025 
1026  protected:
1027 
1028  virtual void run()
1029  {
1030  for(;;)
1031  {
1032  if(shore::mpi::comm_isroot())
1033  {
1034  lock_type lock(m_parallelizer.m_mutex);
1035 
1036  if(m_parallelizer.m_error_thread!=0)
1037  return;
1038 
1039  while(!m_parallelizer.data_ready())
1040  m_parallelizer.m_dataready_condition.wait(lock);
1041 
1042  if(m_parallelizer.m_error_thread!=0)
1043  return;
1044 
1045  m_buffer.swap(m_parallelizer.m_input_buffer);
1046  }
1047  #ifdef WITH_MPI
1048  else
1049  m_parallelizer.m_data_requester.request(m_buffer);
1050  #endif // WITH_MPI
1051 
1052  m_parallelizer.m_dataretrieved_condition.notify_one();
1053 
1054  if(m_buffer.empty())
1055  break;
1056 
1057  this->emit(m_buffer.front());
1058  }
1059 
1060  this->sigflush().emit();
1061  }
1062 
1063  virtual void onjoin()
1064  {
1065  lock_type lock(m_parallelizer.m_mutex);
1066  ++m_parallelizer.m_finished_threads;
1067 
1068  if(status())
1069  {
1070  m_parallelizer.m_error_thread=this;
1071  m_parallelizer.m_batchsize=0;
1072  m_parallelizer.m_dataretrieved_condition.notify_one();
1073  }
1074  }
1075 
1076  public:
1077 
1078  thread(parallelizer &p)
1079  :shore::thread("parallelizer"),
1080  m_parallelizer(p)
1081  {}
1082  };
1083 
1084  int m_nthreads;
1085  multiplexer m_multiplexer;
1086 
1087  boost::thread_group m_thread_g;
1088 
1089  size_t m_batchsize;
1090 
1091  mutex_type m_mutex;
1092  condition_type m_dataready_condition;
1093  condition_type m_dataretrieved_condition;
1094 
1095  std::deque<append_type> m_input_buffer;
1096 
1097  std::deque<append_type> *m_process_buffers;
1098 
1099  //mutex_type m_join_mutex;
1100  //condition_type m_join_condition;
1101 
1102  std::vector<thread *> m_threads;
1103  size_t m_finished_threads;
1104  thread *m_error_thread;
1105  bool m_joined;
1106 
1107  shore::slot<void> m_slotjoin;
1108  shore::signal<void> m_sigjoin;
1109 
1110  void slotfun_join()
1111  {
1112  join();
1113  }
1114 
1115  #ifdef WITH_MPI
1116  shore::slot<std::deque<append_type> &> m_slotproxy_getdata;
1117  mpi_proxy::requester<std::deque<append_type> > m_data_requester;
1118 
1119  void slotfun_proxygetdata(std::deque<append_type> &d)
1120  {
1121  lock_type lock(m_mutex);
1122 
1123  while(!data_ready())
1124  m_dataready_condition.wait(lock);
1125 
1126  if(m_input_buffer.empty())
1127  ++m_finished_threads;
1128 
1129  d.swap(m_input_buffer);
1130  m_input_buffer.clear();
1131  m_dataretrieved_condition.notify_one();
1132  }
1133  #endif // WITH_MPI
1134 
1135  bool data_ready()
1136  {
1137  return m_input_buffer.size()>=m_batchsize;
1138  }
1139 
1140  void create()
1141  {
1142  for(int i=0;i<m_nthreads;++i)
1143  m_thread_g.create_thread(boost::ref(*m_threads[i]));
1144 
1145  #ifdef WITH_MPI
1146  if(shore::mpi::comm_isroot()&&(shore::mpi::comm_size()>1))
1147  shore::mpi_proxy::instance().request_thread();
1148  #endif // WITH_MPI
1149  }
1150 
1151  void join()
1152  {
1153  m_thread_g.join_all();
1154 
1155  #ifdef WITH_MPI
1156  shore::mpi::comm_barrier();
1157  m_data_requester.join();
1158  #endif // WITH_MPI
1159 
1160  m_joined=true;
1161 
1162  if(m_error_thread)
1163  throw std::runtime_error(m_error_thread->errors());
1164 
1165  m_sigjoin.emit();
1166  }
1167 
1168  void errcheck()
1169  {
1170  bool err=false;
1171 
1172  {
1173  lock_type lock(m_mutex);
1174  err=m_error_thread;
1175  }
1176 
1177  if(err)
1178  {
1179  m_dataready_condition.notify_all();
1180  join();
1181  }
1182  }
1183 
1184  protected:
1185 
1186  virtual void parallelization_setup()
1187  {
1188  m_parallelization_numthreads=m_nthreads;
1189  }
1190 
1191  public:
1192 
1193  parallelizer(int nthreads,int batchsize=1)
1194  :m_nthreads(nthreads),
1195  m_multiplexer(*this),
1196  m_batchsize(batchsize),
1197  m_finished_threads(0),
1198  m_error_thread(0),
1199  m_joined(false),
1200  m_slotjoin(boost::bind(&parallelizer::slotfun_join,this))
1201  #ifdef WITH_MPI
1202  ,
1203  m_slotproxy_getdata(boost::bind(&parallelizer::slotfun_proxygetdata,this,_1))
1204  #endif // WITH_MPI
1205  {
1206  m_threads.push_back(new thread(*this));
1207 
1208  for(int i=1;i<m_nthreads;++i)
1209  m_threads.push_back(new thread(*this));
1210  #ifdef WITH_MPI
1211  m_data_requester.siggetdata().connect(m_slotproxy_getdata);
1212  #endif // WITH_MPI
1213 
1214  if(shore::mpi::comm_size()>1)
1215  m_nthreads=std::max(1,m_nthreads);
1216  else
1217  m_nthreads=std::max(0,m_nthreads);
1218 
1219  if(m_nthreads==0)
1220  m_multiplexer|(*m_threads.front());
1221  else
1222  create();
1223  }
1224 
1225  ~parallelizer()
1226  {
1227  // if not joined due to error in the root thread
1228  if(!m_joined)
1229  {
1230  m_batchsize=0;
1231  m_input_buffer.clear();
1232  m_dataready_condition.notify_all();
1233  join();
1234  }
1235 
1236  while(!m_threads.empty())
1237  {
1238  delete m_threads.back();
1239  m_threads.pop_back();
1240  }
1241  #ifdef WITH_MPI
1242  //m_data_requester.join();
1243  m_data_requester.unsubscribe();
1244  #endif // WITH_MPI
1245  }
1246 
1247  int nthreads() const
1248  {
1249  return m_nthreads;
1250  }
1251 };
1252 
1254 template<typename T,typename U>
1255 serial<U> &operator|(serial<T> &ser1,serial<U> &ser2);
1256 
1258 template<typename T,typename U>
1260 
1262 template<typename T,typename U>
1264 
1266 template<typename T,typename U>
1268 
1270 template<typename T,typename U>
1271 sync<U> &operator|(parallel<T> &par,sync<U> &syn);
1272 
1274 template<typename T,typename U>
1275 serial<U> &operator|(sync<T> &syn,serial<U> &ser);
1276 
1278 template<typename T,typename U>
1280 
1282 template<typename T,typename U>
1284 
1285 class parallelization_core_access
1286 {
1287  private:
1288 
1289  template<typename T>
1290  friend void dump(serial<T> &);
1291 
1292  template<typename T,typename U>
1293  friend serial<U> &operator|(serial<T> &ser1,serial<U> &ser2);
1294  template<typename T,typename U>
1295  friend parallel<U> &operator|(parallel<T> &par1,parallel<U> &par2);
1296  template<typename T,typename U>
1298  template<typename T,typename U>
1299  friend parallel<U> &operator|(parallelizer<T> &paz,parallel<U> &par);
1300  template<typename T,typename U>
1301  friend sync<U> &operator|(parallel<T> &par,sync<U> &syn);
1302  template<typename T,typename U>
1303  friend serial<U> &operator|(sync<T> &syn,serial<U> &ser);
1304  template<typename T,typename U>
1305  friend desync<U> &operator|(serial<T> &ser,desync<U> &dsc);
1306  template<typename T,typename U>
1307  friend parallel<U> &operator|(desync<T> &dsc,parallel<U> &par);
1308 
1309  parallelization_core_access();
1310 
1311 
1312  template<typename T,typename U>
1313  class pci_serial_serial
1314  :public parallel_connection_init
1315  {
1316  private:
1317 
1318  serial<T> * m_serial1;
1319  serial<U> * m_serial2;
1320 
1321  public:
1322 
1323  pci_serial_serial(serial<T> & s1,serial<U> & s2)
1324  :m_serial1(&s1),m_serial2(&s2)
1325  {}
1326 
1327  virtual void init()
1328  {
1329  if(m_serial1->num_threads()!=m_serial2->num_threads())
1330  throw std::logic_error("tried to connect pipeline elements"
1331  " with differing number of threads");
1332  for(int i=0;i<m_serial1->num_threads();++i)
1333  {
1334  typename serial<T>::lock_link &ll1=*(m_serial1->m_locklinks[i]);
1335  typename serial<U>::lock_link &ll2=*(m_serial2->m_locklinks[i]);
1336 
1337  (ll1.sigpostlock().connect(ll2.slotpostlock()));
1338  (ll1.sigpreunlock().connect(ll2.slotpreunlock()));
1339  (ll1.sigpostunlock().connect(ll2.slotpostunlock()));
1340  (ll1.sigflushthread().connect(ll2.slotflushthread()));
1341  }
1342  }
1343  };
1344 
1345  template<typename T,typename U>
1346  class pci_parallel_parallel
1347  :public parallel_connection_init
1348  {
1349  private:
1350 
1351  parallel<T> * m_parallel1;
1352  parallel<U> * m_parallel2;
1353 
1354  public:
1355 
1356  pci_parallel_parallel(parallel<T> & s1,parallel<U> & s2)
1357  :m_parallel1(&s1),m_parallel2(&s2)
1358  {}
1359 
1360  virtual void init()
1361  {
1362  if(m_parallel1->num_threads()!=m_parallel2->num_threads())
1363  throw std::logic_error("tried to connect pipeline elements"
1364  " with differing number of threads");
1365  for(int i=0;i<m_parallel1->num_threads();++i)
1366  (*(m_parallel1->m_objects[i]))|(*(m_parallel2->m_objects[i]));
1367  }
1368  };
1369 
1370  template<typename T,typename U>
1371  class pci_parallelizer_parallel
1372  :public parallel_connection_init
1373  {
1374  private:
1375 
1376  parallelizer<T> * m_parallelizer;
1377  parallel<U> * m_parallel;
1378 
1379  public:
1380 
1381  pci_parallelizer_parallel(parallelizer<T> & s1,parallel<U> & s2)
1382  :m_parallelizer(&s1),m_parallel(&s2)
1383  {}
1384 
1385  virtual void init()
1386  {
1387  if(m_parallelizer->num_threads()!=m_parallel->num_threads())
1388  throw std::logic_error("tried to connect pipeline elements"
1389  " with differing number of threads");
1390  (*(m_parallelizer->m_threads[0]))|(*(m_parallel->m_objects[0]));
1391  for(int i=1;i<m_parallelizer->num_threads();++i)
1392  (*(m_parallelizer->m_threads[i]))|(*(m_parallel->m_objects[i]));
1393  }
1394  };
1395 
1396  template<typename T,typename U>
1397  class pci_parallel_sync
1398  :public parallel_connection_init
1399  {
1400  private:
1401 
1402  parallel<T> * m_parallel;
1403  sync<U> * m_sync;
1404 
1405  public:
1406 
1407  pci_parallel_sync(parallel<T> & s1,sync<U> & s2)
1408  :m_parallel(&s1),m_sync(&s2)
1409  {}
1410 
1411  virtual void init()
1412  {
1413  if(m_parallel->num_threads()!=m_sync->num_threads())
1414  throw std::logic_error("tried to connect pipeline elements"
1415  " with differing number of threads");
1416  (*(m_parallel->m_objects[0]))|(*(m_sync->m_thread_tracks[0]));
1417  for(int i=1;i<m_parallel->num_threads();++i)
1418  (*(m_parallel->m_objects[i]))|(*(m_sync->m_thread_tracks[i]));
1419  }
1420  };
1421 
1422  template<typename T,typename U>
1423  class pci_sync_serial
1424  :public parallel_connection_init
1425  {
1426  private:
1427 
1428  sync<T> * m_sync;
1429  serial<U> * m_serial;
1430 
1431  public:
1432 
1433  pci_sync_serial(sync<T> & s1,serial<U> & s2)
1434  :m_sync(&s1),m_serial(&s2)
1435  {}
1436 
1437  virtual void init()
1438  {
1439  if(m_sync->num_threads()!=m_serial->num_threads())
1440  throw std::logic_error("tried to connect pipeline elements"
1441  " with differing number of threads");
1442  if(m_sync->num_threads()==0)
1443  (*(m_sync->m_thread_tracks[0]))|(*(m_serial->m_object));
1444  else
1445  {
1446  for(int i=0;i<m_sync->num_threads();++i)
1447  {
1448  if(m_serial->m_object)
1449  (*(m_sync->m_thread_tracks[i]))|(*(m_serial->m_object));
1450 
1451  typename serial<U>::lock_link &ll=*(m_serial->m_locklinks[i]);
1452 
1453  m_sync->m_thread_tracks[i]->m_sigpostlock.connect(ll.slotpostlock());
1454  m_sync->m_thread_tracks[i]->m_sigpreunlock.connect(ll.slotpreunlock());
1455  m_sync->m_thread_tracks[i]->m_sigpostunlock.connect(ll.slotpostunlock());
1456  m_sync->m_thread_tracks[i]->sigflush().connect(ll.slotflushthread());
1457  }
1458 
1459  if(m_serial->m_object)
1460  {
1461  for(size_t i=1;i<shore::mpi::comm_size();++i)
1462  (*(m_sync->m_proxy_tracks[i-1]))|(*(m_serial->m_object));
1463 
1464  m_serial->m_object->slotflush().disconnect_all();
1465 
1466  m_sync->m_sigsyncflush.connect(m_serial->m_object->slotflush());
1467  }
1468  }
1469  }
1470  };
1471 
1472  template<typename T,typename U>
1473  class pci_serial_desync
1474  :public parallel_connection_init
1475  {
1476  private:
1477 
1478  serial<T> * m_serial;
1479  desync<U> * m_desync;
1480 
1481  public:
1482 
1483  pci_serial_desync(serial<T> & s1,desync<U> & s2)
1484  :m_serial(&s1),m_desync(&s2)
1485  {}
1486 
1487  virtual void init()
1488  {
1489  if(m_serial->num_threads()!=m_desync->num_threads())
1490  throw std::logic_error("tried to connect pipeline elements"
1491  " with differing number of threads");
1492  for(int i=0;i<m_serial->num_threads();++i)
1493  {
1494  typename serial<T>::lock_link &ll=*(m_serial->m_locklinks[i]);
1495 
1496  (ll.sigpostlock().connect(m_desync->m_multiplexer.slotpostlock()));
1497  (ll.sigpreunlock().connect(m_desync->m_multiplexer.slotpreunlock()));
1498  (ll.sigpostunlock().connect(m_desync->m_multiplexer.slotpostunlock()));
1499  (ll.sigflushthread().connect(m_desync->m_multiplexer.slotflushthread()));
1500  }
1501  }
1502  };
1503 
1504  template<typename T,typename U>
1505  class pci_desync_parallel
1506  :public parallel_connection_init
1507  {
1508  private:
1509 
1510  desync<T> * m_desync;
1511  parallel<U> * m_parallel;
1512 
1513  public:
1514 
1515  pci_desync_parallel(desync<T> & s1,parallel<U> & s2)
1516  :m_desync(&s1),m_parallel(&s2)
1517  {}
1518 
1519  virtual void init()
1520  {
1521  if(m_desync->num_threads()!=m_parallel->num_threads())
1522  throw std::logic_error("tried to connect pipeline elements"
1523  " with differing number of threads");
1524  if(m_desync->num_threads()==0)
1525  (m_desync->m_multiplexer)|(*(m_parallel->m_objects[0]));
1526  for(int i=0;i<m_desync->num_threads();++i)
1527  (*(m_desync->m_thread_tracks[i]))|(*(m_parallel->m_objects[i]));
1528  }
1529  };
1530 
1531 
1532  template<typename T>
1533  static void dump(serial<T> & ser)
1534  {
1535  ser.m_signumthreads.emit(0);
1536 
1537  if(shore::mpi::comm_isroot())
1538  ser.call(&T::dump);
1539  else
1540  // non-root process: join the threads
1541  ser.m_sigjoin.emit();
1542  }
1543 
1544  template<typename T,typename U>
1545  static serial<U> &connect(serial<T> &ser1,serial<U> &ser2)
1546  {
1547  ser1.m_sigjoin.connect(ser2.m_slotjoin);
1548  ser1.m_signumthreads.connect(ser2.m_slotnumthreads);
1549 
1550  if((ser1.m_object!=0)&&(ser2.m_object!=0))
1551  (*ser1.m_object)|(*ser2.m_object);
1552 
1553  ser1.m_pci.push_back(new pci_serial_serial<T,U>(ser1,ser2));
1554 
1555  return ser2;
1556  }
1557 
1558  template<typename T,typename U>
1559  static parallel<U> &connect(parallel<T> &par1,parallel<U> &par2)
1560  {
1561  par1.m_sigjoin.connect(par2.m_slotjoin);
1562  par1.m_signumthreads.connect(par2.m_slotnumthreads);
1563 
1564  par1.m_pci.push_back(new pci_parallel_parallel<T,U>(par1,par2));
1565 
1566  return par2;
1567  }
1568 
1569  template<typename T,typename U>
1570  static parallelizer<U> &connect(serial<T> &ser,parallelizer<U> &paz)
1571  {
1572  ser.m_sigjoin.connect(paz.m_slotjoin);
1573  ser.m_signumthreads.connect(paz.m_slotnumthreads);
1574 
1575  if(ser.m_object)
1576  (*ser.m_object)|paz.m_multiplexer;
1577  return paz;
1578  }
1579 
1580  template<typename T,typename U>
1581  static parallel<U> &connect(parallelizer<T> &paz,parallel<U> &par)
1582  {
1583  paz.m_sigjoin.connect(par.m_slotjoin);
1584  paz.m_signumthreads.connect(par.m_slotnumthreads);
1585 
1586  paz.m_pci.push_back(new pci_parallelizer_parallel<T,U>(paz,par));
1587 
1588  return par;
1589  }
1590 
1591  template<typename T,typename U>
1592  static sync<U> &connect(parallel<T> &par,sync<U> &syn)
1593  {
1594  par.m_sigjoin.connect(syn.m_slotjoin);
1595  par.m_signumthreads.connect(syn.m_slotnumthreads);
1596 
1597  par.m_pci.push_back(new pci_parallel_sync<T,U>(par,syn));
1598 
1599  return syn;
1600  }
1601 
1602  template<typename T,typename U>
1603  static serial<U> &connect(sync<T> &syn,serial<U> &ser)
1604  {
1605  syn.m_sigjoin.connect(ser.m_slotjoin);
1606  syn.m_signumthreads.connect(ser.m_slotnumthreads);
1607 
1608  syn.m_pci.push_back(new pci_sync_serial<T,U>(syn,ser));
1609 
1610  return ser;
1611  }
1612 
1613  template<typename T,typename U>
1614  static desync<U> &connect(serial<T> &ser,desync<U> &dsc)
1615  {
1616  ser.m_sigjoin.connect(dsc.m_slotjoin);
1617  ser.m_signumthreads.connect(dsc.m_slotnumthreads);
1618 
1619  (*ser.m_object)|dsc.m_multiplexer;
1620 
1621  ser.m_pci.push_back(new pci_serial_desync<T,U>(ser,dsc));
1622 
1623  return dsc;
1624  }
1625 
1626  template<typename T,typename U>
1627  static parallel<U> &connect(desync<T> &dsc,parallel<U> &par)
1628  {
1629  dsc.m_sigjoin.connect(par.m_slotjoin);
1630  dsc.m_signumthreads.connect(par.m_slotnumthreads);
1631  dsc.m_pci.push_back(new pci_desync_parallel<T,U>(dsc,par));
1632 
1633  return par;
1634  }
1635 };
1636 
1637 template<typename T>
1638 void dump(serial<T> & ser)
1639 {
1640  parallelization_core_access::dump(ser);
1641 }
1642 
1643 template<typename T,typename U>
1645 {
1646  return parallelization_core_access::connect(ser1,ser2);
1647 }
1648 
1649 template<typename T,typename U>
1651 {
1652  return parallelization_core_access::connect(par1,par2);
1653 }
1654 
1655 template<typename T,typename U>
1657 {
1658  return parallelization_core_access::connect(ser,paz);
1659 }
1660 
1661 template<typename T,typename U>
1663 {
1664  return parallelization_core_access::connect(paz,par);
1665 }
1666 
1667 template<typename T,typename U>
1669 {
1670  return parallelization_core_access::connect(par,syn);
1671 }
1672 
1673 template<typename T,typename U>
1675 {
1676  return parallelization_core_access::connect(syn,ser);
1677 }
1678 
1679 template<typename T,typename U>
1681 {
1682  return parallelization_core_access::connect(ser,dsc);
1683 }
1684 
1685 template<typename T,typename U>
1687 {
1688  return parallelization_core_access::connect(dsc,par);
1689 }
1690 
1691 } // namespace
1692 
1693 #endif // SHORE_PARALLEL_PARALLEL_HPP__
1694