26 #ifndef SHORE_PARALLEL_PARALLEL_HPP__
27 #define SHORE_PARALLEL_PARALLEL_HPP__
35 #include <boost/thread/thread.hpp>
36 #include <boost/thread/recursive_mutex.hpp>
37 #include <boost/thread/condition.hpp>
40 #include <boost/serialization/deque.hpp>
54 class parallelization_core_access;
56 class parallel_connection_init
60 virtual ~parallel_connection_init() {}
61 virtual void init()=0;
64 class parallelization_base
68 virtual void parallelization_setup()=0;
70 int m_parallelization_numthreads;
74 friend class parallelization_core_access;
76 std::vector<parallel_connection_init *> m_pci;
83 void slotfun_nthreads(
const int & nthreads)
85 if(shore::mpi::comm_size()>1)
86 m_parallelization_numthreads=std::max(1,nthreads);
88 m_parallelization_numthreads=std::max(0,nthreads);
90 parallelization_setup();
91 m_signumthreads.emit(m_parallelization_numthreads);
93 for(
size_t i=0;i<m_pci.size();++i)
97 parallelization_base(
const parallelization_base &);
98 parallelization_base & operator=(
const parallelization_base &);
102 parallelization_base()
103 :m_slotnumthreads(boost::bind(¶llelization_base::slotfun_nthreads,this,_1))
106 virtual ~parallelization_base()
108 while(!m_pci.empty())
115 int num_threads()
const
117 return m_parallelization_numthreads;
125 :
public parallelization_base
129 friend class parallelization_core_access;
131 typedef T object_type;
152 :m_slotpostlock(boost::ref(m_sigpostlock.emit)),
153 m_slotpreunlock(boost::ref(m_sigpreunlock.emit)),
154 m_slotpostunlock(boost::ref(m_sigpostunlock.emit)),
155 m_slotflushthread(boost::ref(m_sigflushthread.emit))
160 return m_slotpostlock;
165 return m_slotpreunlock;
170 return m_slotpostunlock;
175 return m_slotflushthread;
180 return m_sigpostlock;
185 return m_sigpreunlock;
190 return m_sigpostunlock;
195 return m_sigflushthread;
199 object_type *m_object;
200 std::vector<lock_link *> m_locklinks;
211 virtual void parallelization_setup()
213 while(!m_locklinks.empty())
215 delete m_locklinks.back();
216 m_locklinks.pop_back();
219 for(
int i=0;i<num_threads();++i)
220 m_locklinks.push_back(
new lock_link);
229 :m_slotjoin(boost::ref(m_sigjoin.emit))
231 if(shore::mpi::comm_isroot())
232 m_object=
new object_type;
239 :m_slotjoin(boost::ref(m_sigjoin.emit))
241 if(shore::mpi::comm_isroot())
242 m_object=
new object_type(arg);
247 template<
typename A1,
typename A2>
248 serial(
const A1 & arg1,
const A2 & arg2)
249 :m_slotjoin(boost::ref(m_sigjoin.emit))
251 if(shore::mpi::comm_isroot())
252 m_object=
new object_type(arg1,arg2);
257 template<
typename A1,
typename A2,
typename A3>
258 serial(
const A1 & arg1,
const A2 & arg2,
const A3 & arg3)
259 :m_slotjoin(boost::ref(m_sigjoin.emit))
261 if(shore::mpi::comm_isroot())
262 m_object=
new object_type(arg1,arg2,arg3);
269 while(!m_locklinks.empty())
271 delete m_locklinks.back();
272 m_locklinks.pop_back();
279 R call(R (object_type::*func)())
282 return (m_object->*func)();
286 template<
typename R,
typename A>
287 R call(R (object_type::*func)(
const A &),
const A &arg)
290 return (m_object->*func)(arg);
294 template<
typename R,
typename A>
295 R call(R (object_type::*func)(
const A),
const A &arg)
298 return (m_object->*func)(arg);
306 :
public parallelization_base
310 friend class parallelization_core_access;
312 typedef T object_type;
317 virtual ~factory_base() {}
318 virtual object_type *create()=0;
321 template<
typename Obj>
325 virtual Obj *create()
331 template<
typename Obj,
typename Arg>
337 arg1_factory(
const Arg & a)
341 virtual Obj *create()
348 std::vector<object_type *> m_objects;
349 factory_base * m_factory;
356 virtual void parallelization_setup()
358 while(!m_objects.empty())
360 delete m_objects.back();
361 m_objects.pop_back();
364 m_objects.push_back(m_factory->create());
365 for(
int i=1;i<num_threads();++i)
366 m_objects.push_back(m_factory->create());
373 :m_slotjoin(boost::ref(m_sigjoin.emit))
375 m_factory=
new arg0_factory<object_type>;
380 :m_slotjoin(boost::ref(m_sigjoin.emit))
382 m_factory=
new arg1_factory<object_type,A>(arg);
387 while(!m_objects.empty())
389 delete m_objects.back();
390 m_objects.pop_back();
401 :
public parallelization_base
405 typedef T append_type;
406 typedef T current_type;
410 friend class parallelization_core_access;
412 typedef boost::recursive_mutex mutex_type;
413 typedef boost::recursive_mutex::scoped_lock lock_type;
414 typedef boost::condition condition_type;
420 condition_type m_flush_condition;
425 std::set<int> m_flushed_threads;
426 std::set<int> m_flushed_processes;
429 std::vector<track *> m_thread_tracks;
430 std::vector<track *> m_proxy_tracks;
435 if(shore::mpi::comm_isroot())
437 lock_type lock(m_mutex);
439 while(m_flushed_processes.size()<(shore::mpi::comm_size()-1))
440 m_flush_condition.wait(lock);
442 m_data_submitter.join();
443 m_flush_submitter.join();
450 std::vector<shore::feed<append_type> > m_proxy_feeds;
452 mpi_proxy::submitter<append_type> m_data_submitter;
455 mpi_proxy::submitter<int> m_flush_submitter;
458 void slotfun_proxyappend(
const append_type &d)
460 m_proxy_feeds[m_data_submitter.get_source()-1].append(d);
463 void slotfun_proxyflush(
const int &)
465 m_proxy_feeds[m_flush_submitter.get_source()-1].flush();
466 m_flush_condition.notify_all();
484 friend class parallelization_core_access;
489 int m_proxied_process;
495 track(
const track &);
496 track &operator=(
const track &);
501 m_lock=
new lock_type(m_sync.m_mutex);
502 m_sigpostlock.emit(m_proxied_process);
507 m_sigpreunlock.emit(m_proxied_process);
512 m_sigpostunlock.emit(m_proxied_process);
519 void append(
const append_type &d)
527 if(!shore::mpi::comm_isroot())
528 m_sync.m_data_submitter.submit(d);
545 if(m_sync.num_threads()==0)
554 if(m_proxied_process==0)
555 m_sync.m_flushed_threads.insert(thread_index::id());
558 m_sync.m_flushed_processes.insert(m_proxied_process);
561 const int unflushed_threads=
562 m_sync.num_threads()-int(m_sync.m_flushed_threads.size());
564 int unflushed_processes=0;
569 shore::mpi::comm_size()-int(m_sync.m_flushed_processes.size())-1;
572 if(shore::mpi::comm_isroot())
574 if((unflushed_threads==0)&&(unflushed_processes==0))
575 m_sync.m_sigsyncflush.emit();
578 else if(unflushed_threads==0)
579 m_sync.m_flush_submitter.submit(0);
598 track(
sync &s,
const int pr)
601 m_proxied_process(pr)
607 virtual void parallelization_setup()
609 while(!m_thread_tracks.empty())
611 delete m_thread_tracks.back();
612 m_thread_tracks.pop_back();
615 m_thread_tracks.push_back(
new track(*
this,0));
616 for(
int i=1;i<num_threads();++i)
617 m_thread_tracks.push_back(
new track(*
this,0));
623 :m_slotjoin(boost::ref(m_sigjoin.emit))
626 m_slotproxy_data(boost::bind(&sync::slotfun_proxyappend,
this,_1)),
627 m_slotproxy_flush(boost::bind(&sync::slotfun_proxyflush,
this,_1))
631 if(shore::mpi::comm_isroot())
633 m_data_submitter.sigdata().connect(m_slotproxy_data);
634 m_flush_submitter.sigdata().connect(m_slotproxy_flush);
636 m_proxy_feeds.resize(shore::mpi::comm_size()-1);
637 for(
size_t i=1;i<shore::mpi::comm_size();++i)
639 m_proxy_tracks.push_back(
new track(*
this,i));
640 m_proxy_feeds[i-1]|(*m_proxy_tracks.back());
648 while(!m_thread_tracks.empty())
650 delete m_thread_tracks.back();
651 m_thread_tracks.pop_back();
654 while(!m_proxy_tracks.empty())
656 delete m_proxy_tracks.back();
657 m_proxy_tracks.pop_back();
659 m_data_submitter.unsubscribe();
660 m_flush_submitter.unsubscribe();
664 track &operator[](
const size_t i)
666 return *m_thread_tracks[i];
676 :
public parallelization_base
680 friend class parallelization_core_access;
683 typedef T append_type;
684 typedef T current_type;
686 typedef boost::recursive_mutex mutex_type;
687 typedef boost::recursive_mutex::scoped_lock lock_type;
698 multiplexer m_multiplexer;
701 std::vector<lock_buffer *> m_thread_tracks;
703 std::vector<lock_buffer *> m_process_tracks;
706 std::map<boost::thread::id,int> m_bufmapping;
710 mpi_proxy::requester<std::deque<append_type> > m_data_requester;
712 void slotfun_proxygetdata(std::deque<append_type> &d)
714 const int pr=m_data_requester.get_target();
715 lock_buffer &b=get_lockbuffer(pr);
722 lock_buffer &get_lockbuffer(
int process)
724 lock_type lock(m_mutex);
726 return *m_process_tracks[process-1];
728 int &thr=m_bufmapping[boost::this_thread::get_id()];
730 thr=m_bufmapping.size();
731 return *m_thread_tracks[thr-1];
740 friend class parallelization_core_access;
741 friend class multiplexer;
743 std::deque<append_type> m_buffer;
747 m_buffer.pop_front();
749 if(!m_buffer.empty())
750 this->emit(m_buffer.front());
755 void append(
const append_type &d)
757 m_buffer.push_back(d);
767 std::deque<append_type> &buffer()
774 return m_buffer.empty();
777 const append_type &front()
const
779 return m_buffer.front();
784 m_buffer.pop_front();
789 if(!m_buffer.empty())
790 this->emit(m_buffer.front());
801 friend class parallelization_core_access;
811 void slotfun_postlock(
int pr)
813 lock_buffer &b=m_desync.get_lockbuffer(pr);
817 if(!shore::mpi::comm_isroot())
818 m_desync.m_data_requester.request(b.buffer());
822 void slotfun_preunlock(
int pr)
824 this->disconnect_outputs();
827 void slotfun_postunlock(
int pr)
834 m_desync.get_lockbuffer(pr).dump();
841 void append(
const append_type &d)
852 m_slotpostlock(boost::bind(&multiplexer::slotfun_postlock,
this,_1)),
853 m_slotpreunlock(boost::bind(&multiplexer::slotfun_preunlock,
this,_1)),
854 m_slotpostunlock(boost::bind(&multiplexer::slotfun_postunlock,
this,_1))
859 return m_slotpostlock;
864 return m_slotpreunlock;
869 return m_slotpostunlock;
879 virtual void parallelization_setup()
881 while(!m_thread_tracks.empty())
883 delete m_thread_tracks.back();
884 m_thread_tracks.pop_back();
887 for(
int i=0;i<num_threads();++i)
888 m_thread_tracks.push_back(
new lock_buffer);
894 :m_slotjoin(boost::bind(&desync::slotfun_join,
this)),
898 m_slotproxy_getdata(boost::bind(&desync::slotfun_proxygetdata,
this,_1))
901 if(shore::mpi::comm_isroot())
902 for(
size_t i=1;i<shore::mpi::comm_size();++i)
903 m_process_tracks.push_back(
new lock_buffer);
905 m_data_requester.siggetdata().connect(m_slotproxy_getdata);
912 m_data_requester.join();
913 m_data_requester.unsubscribe();
921 :
public parallelization_base
925 friend class parallelization_core_access;
927 typedef T append_type;
928 typedef T current_type;
930 typedef boost::recursive_mutex mutex_type;
931 typedef boost::recursive_mutex::scoped_lock lock_type;
932 typedef boost::condition condition_type;
944 multiplexer(
const multiplexer &);
950 void append(
const append_type &d)
952 if(m_parallelizer.m_nthreads==0)
958 m_parallelizer.errcheck();
960 lock_type lock(m_parallelizer.m_mutex);
962 if(m_parallelizer.data_ready())
963 m_parallelizer.m_dataretrieved_condition.wait(lock);
965 m_parallelizer.m_input_buffer.push_back(d);
967 if(m_parallelizer.data_ready())
968 m_parallelizer.m_dataready_condition.notify_one();
974 if(m_parallelizer.m_nthreads>0)
977 lock_type lock(m_parallelizer.m_mutex);
978 m_parallelizer.errcheck();
980 m_parallelizer.m_batchsize=0;
981 m_parallelizer.m_dataready_condition.notify_all();
983 m_parallelizer.join();
1004 std::deque<append_type> m_buffer;
1006 std::ostringstream m_errlog;
1010 if(!m_buffer.empty())
1011 m_buffer.pop_front();
1012 if(!m_buffer.empty())
1013 this->emit(m_buffer.front());
1018 void append(
const append_type &d)
1032 if(shore::mpi::comm_isroot())
1034 lock_type lock(m_parallelizer.m_mutex);
1036 if(m_parallelizer.m_error_thread!=0)
1039 while(!m_parallelizer.data_ready())
1040 m_parallelizer.m_dataready_condition.wait(lock);
1042 if(m_parallelizer.m_error_thread!=0)
1045 m_buffer.swap(m_parallelizer.m_input_buffer);
1049 m_parallelizer.m_data_requester.request(m_buffer);
1052 m_parallelizer.m_dataretrieved_condition.notify_one();
1054 if(m_buffer.empty())
1057 this->emit(m_buffer.front());
1060 this->sigflush().
emit();
1063 virtual void onjoin()
1065 lock_type lock(m_parallelizer.m_mutex);
1066 ++m_parallelizer.m_finished_threads;
1070 m_parallelizer.m_error_thread=
this;
1071 m_parallelizer.m_batchsize=0;
1072 m_parallelizer.m_dataretrieved_condition.notify_one();
1085 multiplexer m_multiplexer;
1087 boost::thread_group m_thread_g;
1092 condition_type m_dataready_condition;
1093 condition_type m_dataretrieved_condition;
1095 std::deque<append_type> m_input_buffer;
1097 std::deque<append_type> *m_process_buffers;
1102 std::vector<thread *> m_threads;
1103 size_t m_finished_threads;
1104 thread *m_error_thread;
1117 mpi_proxy::requester<std::deque<append_type> > m_data_requester;
1119 void slotfun_proxygetdata(std::deque<append_type> &d)
1121 lock_type lock(m_mutex);
1123 while(!data_ready())
1124 m_dataready_condition.wait(lock);
1126 if(m_input_buffer.empty())
1127 ++m_finished_threads;
1129 d.swap(m_input_buffer);
1130 m_input_buffer.clear();
1131 m_dataretrieved_condition.notify_one();
1137 return m_input_buffer.size()>=m_batchsize;
1142 for(
int i=0;i<m_nthreads;++i)
1143 m_thread_g.create_thread(boost::ref(*m_threads[i]));
1146 if(shore::mpi::comm_isroot()&&(shore::mpi::comm_size()>1))
1147 shore::mpi_proxy::instance().request_thread();
1153 m_thread_g.join_all();
1156 shore::mpi::comm_barrier();
1157 m_data_requester.join();
1163 throw std::runtime_error(m_error_thread->errors());
1173 lock_type lock(m_mutex);
1179 m_dataready_condition.notify_all();
1186 virtual void parallelization_setup()
1188 m_parallelization_numthreads=m_nthreads;
1194 :m_nthreads(nthreads),
1195 m_multiplexer(*
this),
1196 m_batchsize(batchsize),
1197 m_finished_threads(0),
1200 m_slotjoin(boost::bind(¶llelizer::slotfun_join,
this))
1203 m_slotproxy_getdata(boost::bind(¶llelizer::slotfun_proxygetdata,
this,_1))
1206 m_threads.push_back(
new thread(*
this));
1208 for(
int i=1;i<m_nthreads;++i)
1209 m_threads.push_back(
new thread(*
this));
1211 m_data_requester.siggetdata().connect(m_slotproxy_getdata);
1214 if(shore::mpi::comm_size()>1)
1215 m_nthreads=std::max(1,m_nthreads);
1217 m_nthreads=std::max(0,m_nthreads);
1220 m_multiplexer|(*m_threads.front());
1231 m_input_buffer.clear();
1232 m_dataready_condition.notify_all();
1236 while(!m_threads.empty())
1238 delete m_threads.back();
1239 m_threads.pop_back();
1243 m_data_requester.unsubscribe();
1247 int nthreads()
const
1254 template<
typename T,
typename U>
1258 template<
typename T,
typename U>
1262 template<
typename T,
typename U>
1266 template<
typename T,
typename U>
1270 template<
typename T,
typename U>
1274 template<
typename T,
typename U>
1278 template<
typename T,
typename U>
1282 template<
typename T,
typename U>
1285 class parallelization_core_access
1289 template<
typename T>
1292 template<
typename T,
typename U>
1294 template<
typename T,
typename U>
1296 template<
typename T,
typename U>
1298 template<
typename T,
typename U>
1300 template<
typename T,
typename U>
1302 template<
typename T,
typename U>
1304 template<
typename T,
typename U>
1306 template<
typename T,
typename U>
1309 parallelization_core_access();
1312 template<
typename T,
typename U>
1313 class pci_serial_serial
1314 :
public parallel_connection_init
1324 :m_serial1(&s1),m_serial2(&s2)
1329 if(m_serial1->num_threads()!=m_serial2->num_threads())
1330 throw std::logic_error(
"tried to connect pipeline elements"
1331 " with differing number of threads");
1332 for(
int i=0;i<m_serial1->num_threads();++i)
1334 typename serial<T>::lock_link &ll1=*(m_serial1->m_locklinks[i]);
1335 typename serial<U>::lock_link &ll2=*(m_serial2->m_locklinks[i]);
1337 (ll1.sigpostlock().connect(ll2.slotpostlock()));
1338 (ll1.sigpreunlock().connect(ll2.slotpreunlock()));
1339 (ll1.sigpostunlock().connect(ll2.slotpostunlock()));
1340 (ll1.sigflushthread().connect(ll2.slotflushthread()));
1345 template<
typename T,
typename U>
1346 class pci_parallel_parallel
1347 :
public parallel_connection_init
1351 parallel<T> * m_parallel1;
1352 parallel<U> * m_parallel2;
1356 pci_parallel_parallel(parallel<T> & s1,parallel<U> & s2)
1357 :m_parallel1(&s1),m_parallel2(&s2)
1362 if(m_parallel1->num_threads()!=m_parallel2->num_threads())
1363 throw std::logic_error(
"tried to connect pipeline elements"
1364 " with differing number of threads");
1365 for(
int i=0;i<m_parallel1->num_threads();++i)
1366 (*(m_parallel1->m_objects[i]))|(*(m_parallel2->m_objects[i]));
1370 template<
typename T,
typename U>
1371 class pci_parallelizer_parallel
1372 :
public parallel_connection_init
1376 parallelizer<T> * m_parallelizer;
1377 parallel<U> * m_parallel;
1381 pci_parallelizer_parallel(parallelizer<T> & s1,parallel<U> & s2)
1382 :m_parallelizer(&s1),m_parallel(&s2)
1387 if(m_parallelizer->num_threads()!=m_parallel->num_threads())
1388 throw std::logic_error(
"tried to connect pipeline elements"
1389 " with differing number of threads");
1390 (*(m_parallelizer->m_threads[0]))|(*(m_parallel->m_objects[0]));
1391 for(
int i=1;i<m_parallelizer->num_threads();++i)
1392 (*(m_parallelizer->m_threads[i]))|(*(m_parallel->m_objects[i]));
1396 template<
typename T,
typename U>
1397 class pci_parallel_sync
1398 :
public parallel_connection_init
1402 parallel<T> * m_parallel;
1407 pci_parallel_sync(parallel<T> & s1,sync<U> & s2)
1408 :m_parallel(&s1),m_sync(&s2)
1413 if(m_parallel->num_threads()!=m_sync->num_threads())
1414 throw std::logic_error(
"tried to connect pipeline elements"
1415 " with differing number of threads");
1416 (*(m_parallel->m_objects[0]))|(*(m_sync->m_thread_tracks[0]));
1417 for(
int i=1;i<m_parallel->num_threads();++i)
1418 (*(m_parallel->m_objects[i]))|(*(m_sync->m_thread_tracks[i]));
1422 template<
typename T,
typename U>
1423 class pci_sync_serial
1424 :
public parallel_connection_init
1429 serial<U> * m_serial;
1433 pci_sync_serial(sync<T> & s1,serial<U> & s2)
1434 :m_sync(&s1),m_serial(&s2)
1439 if(m_sync->num_threads()!=m_serial->num_threads())
1440 throw std::logic_error(
"tried to connect pipeline elements"
1441 " with differing number of threads");
1442 if(m_sync->num_threads()==0)
1443 (*(m_sync->m_thread_tracks[0]))|(*(m_serial->m_object));
1446 for(
int i=0;i<m_sync->num_threads();++i)
1448 if(m_serial->m_object)
1449 (*(m_sync->m_thread_tracks[i]))|(*(m_serial->m_object));
1451 typename serial<U>::lock_link &ll=*(m_serial->m_locklinks[i]);
1453 m_sync->m_thread_tracks[i]->m_sigpostlock.connect(ll.slotpostlock());
1454 m_sync->m_thread_tracks[i]->m_sigpreunlock.connect(ll.slotpreunlock());
1455 m_sync->m_thread_tracks[i]->m_sigpostunlock.connect(ll.slotpostunlock());
1456 m_sync->m_thread_tracks[i]->sigflush().connect(ll.slotflushthread());
1459 if(m_serial->m_object)
1461 for(
size_t i=1;i<shore::mpi::comm_size();++i)
1462 (*(m_sync->m_proxy_tracks[i-1]))|(*(m_serial->m_object));
1464 m_serial->m_object->slotflush().disconnect_all();
1466 m_sync->m_sigsyncflush.connect(m_serial->m_object->slotflush());
1472 template<
typename T,
typename U>
1473 class pci_serial_desync
1474 :
public parallel_connection_init
1478 serial<T> * m_serial;
1479 desync<U> * m_desync;
1483 pci_serial_desync(serial<T> & s1,desync<U> & s2)
1484 :m_serial(&s1),m_desync(&s2)
1489 if(m_serial->num_threads()!=m_desync->num_threads())
1490 throw std::logic_error(
"tried to connect pipeline elements"
1491 " with differing number of threads");
1492 for(
int i=0;i<m_serial->num_threads();++i)
1494 typename serial<T>::lock_link &ll=*(m_serial->m_locklinks[i]);
1496 (ll.sigpostlock().connect(m_desync->m_multiplexer.slotpostlock()));
1497 (ll.sigpreunlock().connect(m_desync->m_multiplexer.slotpreunlock()));
1498 (ll.sigpostunlock().connect(m_desync->m_multiplexer.slotpostunlock()));
1499 (ll.sigflushthread().connect(m_desync->m_multiplexer.slotflushthread()));
1504 template<
typename T,
typename U>
1505 class pci_desync_parallel
1506 :
public parallel_connection_init
1510 desync<T> * m_desync;
1511 parallel<U> * m_parallel;
1515 pci_desync_parallel(desync<T> & s1,parallel<U> & s2)
1516 :m_desync(&s1),m_parallel(&s2)
1521 if(m_desync->num_threads()!=m_parallel->num_threads())
1522 throw std::logic_error(
"tried to connect pipeline elements"
1523 " with differing number of threads");
1524 if(m_desync->num_threads()==0)
1525 (m_desync->m_multiplexer)|(*(m_parallel->m_objects[0]));
1526 for(
int i=0;i<m_desync->num_threads();++i)
1527 (*(m_desync->m_thread_tracks[i]))|(*(m_parallel->m_objects[i]));
1532 template<
typename T>
1533 static void dump(serial<T> & ser)
1535 ser.m_signumthreads.emit(0);
1537 if(shore::mpi::comm_isroot())
1541 ser.m_sigjoin.emit();
1544 template<
typename T,
typename U>
1545 static serial<U> &connect(serial<T> &ser1,serial<U> &ser2)
1547 ser1.m_sigjoin.connect(ser2.m_slotjoin);
1548 ser1.m_signumthreads.connect(ser2.m_slotnumthreads);
1550 if((ser1.m_object!=0)&&(ser2.m_object!=0))
1551 (*ser1.m_object)|(*ser2.m_object);
1553 ser1.m_pci.push_back(
new pci_serial_serial<T,U>(ser1,ser2));
1558 template<
typename T,
typename U>
1559 static parallel<U> &connect(parallel<T> &par1,parallel<U> &par2)
1561 par1.m_sigjoin.connect(par2.m_slotjoin);
1562 par1.m_signumthreads.connect(par2.m_slotnumthreads);
1564 par1.m_pci.push_back(
new pci_parallel_parallel<T,U>(par1,par2));
1569 template<
typename T,
typename U>
1570 static parallelizer<U> &connect(serial<T> &ser,parallelizer<U> &paz)
1572 ser.m_sigjoin.connect(paz.m_slotjoin);
1573 ser.m_signumthreads.connect(paz.m_slotnumthreads);
1576 (*ser.m_object)|paz.m_multiplexer;
1580 template<
typename T,
typename U>
1581 static parallel<U> &connect(parallelizer<T> &paz,parallel<U> &par)
1583 paz.m_sigjoin.connect(par.m_slotjoin);
1584 paz.m_signumthreads.connect(par.m_slotnumthreads);
1586 paz.m_pci.push_back(
new pci_parallelizer_parallel<T,U>(paz,par));
1591 template<
typename T,
typename U>
1592 static sync<U> &connect(parallel<T> &par,sync<U> &syn)
1594 par.m_sigjoin.connect(syn.m_slotjoin);
1595 par.m_signumthreads.connect(syn.m_slotnumthreads);
1597 par.m_pci.push_back(
new pci_parallel_sync<T,U>(par,syn));
1602 template<
typename T,
typename U>
1603 static serial<U> &connect(sync<T> &syn,serial<U> &ser)
1605 syn.m_sigjoin.connect(ser.m_slotjoin);
1606 syn.m_signumthreads.connect(ser.m_slotnumthreads);
1608 syn.m_pci.push_back(
new pci_sync_serial<T,U>(syn,ser));
1613 template<
typename T,
typename U>
1614 static desync<U> &connect(serial<T> &ser,desync<U> &dsc)
1616 ser.m_sigjoin.connect(dsc.m_slotjoin);
1617 ser.m_signumthreads.connect(dsc.m_slotnumthreads);
1619 (*ser.m_object)|dsc.m_multiplexer;
1621 ser.m_pci.push_back(
new pci_serial_desync<T,U>(ser,dsc));
1626 template<
typename T,
typename U>
1627 static parallel<U> &connect(desync<T> &dsc,parallel<U> &par)
1629 dsc.m_sigjoin.connect(par.m_slotjoin);
1630 dsc.m_signumthreads.connect(par.m_slotnumthreads);
1631 dsc.m_pci.push_back(
new pci_desync_parallel<T,U>(dsc,par));
1637 template<
typename T>
1638 void dump(serial<T> & ser)
1640 parallelization_core_access::dump(ser);
1643 template<
typename T,
typename U>
1646 return parallelization_core_access::connect(ser1,ser2);
1649 template<
typename T,
typename U>
1652 return parallelization_core_access::connect(par1,par2);
1655 template<
typename T,
typename U>
1658 return parallelization_core_access::connect(ser,paz);
1661 template<
typename T,
typename U>
1664 return parallelization_core_access::connect(paz,par);
1667 template<
typename T,
typename U>
1670 return parallelization_core_access::connect(par,syn);
1673 template<
typename T,
typename U>
1676 return parallelization_core_access::connect(syn,ser);
1679 template<
typename T,
typename U>
1682 return parallelization_core_access::connect(ser,dsc);
1685 template<
typename T,
typename U>
1688 return parallelization_core_access::connect(dsc,par);
1693 #endif // SHORE_PARALLEL_PARALLEL_HPP__