26 #ifndef SHORE_PARALLEL_PROXY_HPP__
27 #define SHORE_PARALLEL_PROXY_HPP__
35 #include <boost/thread/thread.hpp>
36 #include <boost/thread/recursive_mutex.hpp>
37 #include <boost/thread/condition.hpp>
50 typedef boost::recursive_mutex mutex_type;
51 typedef boost::recursive_mutex::scoped_lock lock_type;
52 typedef boost::condition condition_type;
54 static mutex_type m_mutex;
58 friend class communicator_base;
63 class communicator_base
80 lock_type lock(mpi_proxy::instance().mutex());
82 if(shore::mpi::comm_isroot())
84 if(mpi_proxy::instance().m_nexttag>=shore::mpi::env_maxtag())
85 throw std::runtime_error(
"mpi_proxy::communicator_base:"
86 " no more free tags");
88 m_tag=mpi_proxy::instance().m_nexttag;
89 ++(mpi_proxy::instance().m_nexttag);
91 mpi_proxy::instance().m_tagmap[m_tag]=
92 shore::mpi::comm_size()-1;
93 mpi_proxy::instance().m_comap[m_tag]=
this;
96 shore::mpi::comm_broadcast(m_tag);
99 virtual ~communicator_base() {}
101 virtual void rootaction(
const int src)=0;
105 if(!shore::mpi::comm_isroot())
107 lock_type lock(mpi_proxy::instance().mutex());
108 shore::mpi::comm_send(0,3,m_tag);
109 shore::mpi::comm_recv(0,3);
117 :
public communicator_base,
122 typedef boost::recursive_mutex mutex_type;
123 typedef boost::recursive_mutex::scoped_lock lock_type;
124 typedef boost::condition condition_type;
134 condition_type m_emit_condition;
135 condition_type m_ready_condition;
142 boost::thread *m_thread;
144 submitter(
const submitter &);
145 submitter &operator=(
const submitter &);
154 lock_type lock(m_mutex);
156 m_ready_condition.notify_one();
160 m_emit_condition.wait(lock);
162 m_sigdata.emit(m_buf);
165 m_ready_condition.notify_one();
172 :shore::thread(
"mpi_submitter"),
188 lock_type lock(m_mutex);
190 m_ready_condition.wait(lock);
192 m_emit_condition.notify_one();
196 throw std::runtime_error(errors());
200 void submit(
const T &d)
202 lock_type lock(mpi_proxy::instance().mutex());
203 shore::mpi::comm_send(0,get_tag(),d);
206 virtual void rootaction(
const int src)
208 #ifdef SHORE_MPI_NO_COMMTHREADS
209 shore::mpi::comm_recv(src,get_tag(),m_buf);
211 m_sigdata.emit(m_buf);
213 #else // !SHORE_MPI_NO_COMMTHREADS
215 lock_type lock(m_mutex);
218 m_thread=
new boost::thread(boost::ref(*
this));
221 m_ready_condition.wait(lock);
224 throw std::runtime_error(errors());
226 shore::mpi::comm_recv(src,get_tag(),m_buf);
229 m_emit_condition.notify_one();
231 #endif // !SHORE_MPI_NO_COMMTHREADS
239 int get_source()
const
248 :
public communicator_base,
253 typedef boost::recursive_mutex mutex_type;
254 typedef boost::recursive_mutex::scoped_lock lock_type;
255 typedef boost::condition condition_type;
264 condition_type m_emit_condition;
265 condition_type m_ready_condition;
272 boost::thread *m_thread;
275 requester(
const requester &);
276 requester &operator=(
const requester &);
284 lock_type lock(m_mutex);
286 m_ready_condition.notify_one();
290 m_emit_condition.wait(lock);
295 m_siggetdata.emit(m_rootbuf);
301 shore::mpi::comm_send(m_target,get_tag(),m_rootbuf);
305 m_ready_condition.notify_one();
312 :shore::thread(
"mpi_requester"),
328 lock_type lock(m_mutex);
330 m_ready_condition.wait(lock);
332 m_emit_condition.notify_one();
336 throw std::runtime_error(errors());
341 void request(T &result)
343 lock_type lock(mpi_proxy::instance().mutex());
344 shore::mpi::comm_send(0,get_tag());
345 shore::mpi::comm_recv(0,get_tag(),result);
348 virtual void rootaction(
const int trg)
350 #ifdef SHORE_MPI_NO_COMMTHREADS
351 shore::mpi::comm_recv(trg,get_tag());
353 m_siggetdata.emit(m_rootbuf);
354 shore::mpi::comm_send(trg,get_tag(),m_rootbuf);
356 #else // !SHORE_MPI_NO_COMMTHREADS
358 lock_type lock(m_mutex);
361 m_thread=
new boost::thread(boost::ref(*
this));
364 m_ready_condition.wait(lock);
367 throw std::runtime_error(errors());
369 shore::mpi::comm_recv(trg,get_tag());
372 m_emit_condition.notify_one();
374 #endif // !SHORE_MPI_NO_COMMTHREADS
382 const int get_target()
const
390 static mpi_proxy* m_instance;
393 condition_type m_ready_condition;
394 boost::thread *m_thread;
396 std::map<int,communicator_base*> m_comap;
397 std::map<int,int> m_tagmap;
404 mpi_proxy(
const mpi_proxy &);
405 mpi_proxy &operator=(
const mpi_proxy &);
411 static mpi_proxy &instance();
415 void request_thread();
420 void rootmsg(
const std::string& msg);
427 #endif // SHORE_PARALLEL_PROXY_HPP__