SHORE API
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
message_passing_proxy.hpp
Go to the documentation of this file.
1 
2 /*
3  * Copyright 2008,2009,2010,2011,2012 Stephan Ossowski, Korbinian Schneeberger,
4  * Felix Ott, Joerg Hagmann, Alf Scotland, Sebastian Bender
5  *
6  * This file is part of SHORE.
7  *
8  * SHORE is free software: you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation, either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * SHORE is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with SHORE. If not, see <http://www.gnu.org/licenses/>.
20  */
21 
25 
26 #ifndef SHORE_PARALLEL_PROXY_HPP__
27 #define SHORE_PARALLEL_PROXY_HPP__
28 
29 #ifdef WITH_MPI
30 
31 #include <iostream>
32 #include <stdexcept>
33 #include <map>
34 
35 #include <boost/thread/thread.hpp>
36 #include <boost/thread/recursive_mutex.hpp>
37 #include <boost/thread/condition.hpp>
38 
42 
43 namespace shore {
44 
46 class mpi_proxy
47 {
48  private:
49 
50  typedef boost::recursive_mutex mutex_type;
51  typedef boost::recursive_mutex::scoped_lock lock_type;
52  typedef boost::condition condition_type;
53 
54  static mutex_type m_mutex;
55 
56  public:
57 
58  friend class communicator_base;
59 
63  class communicator_base
64  {
65  private:
66 
67  int m_tag;
68 
69  protected:
70 
71  int get_tag()
72  {
73  return m_tag;
74  }
75 
76  public:
77 
78  communicator_base()
79  {
80  lock_type lock(mpi_proxy::instance().mutex());
81 
82  if(shore::mpi::comm_isroot())
83  {
84  if(mpi_proxy::instance().m_nexttag>=shore::mpi::env_maxtag())
85  throw std::runtime_error("mpi_proxy::communicator_base:"
86  " no more free tags");
87 
88  m_tag=mpi_proxy::instance().m_nexttag;
89  ++(mpi_proxy::instance().m_nexttag);
90 
91  mpi_proxy::instance().m_tagmap[m_tag]=
92  shore::mpi::comm_size()-1;
93  mpi_proxy::instance().m_comap[m_tag]=this;
94  }
95 
96  shore::mpi::comm_broadcast(m_tag);
97  }
98 
99  virtual ~communicator_base() {}
100 
101  virtual void rootaction(const int src)=0;
102 
103  void unsubscribe()
104  {
105  if(!shore::mpi::comm_isroot())
106  {
107  lock_type lock(mpi_proxy::instance().mutex());
108  shore::mpi::comm_send(0,3,m_tag);
109  shore::mpi::comm_recv(0,3);
110  }
111  }
112  };
113 
115  template<typename T>
116  class submitter
117  :public communicator_base,
118  public shore::thread
119  {
120  private:
121 
122  typedef boost::recursive_mutex mutex_type;
123  typedef boost::recursive_mutex::scoped_lock lock_type;
124  typedef boost::condition condition_type;
125 
126  int m_source;
127 
128  T m_buf;
129 
130  shore::signal<const T&> m_sigdata;
131 
132 
133  mutex_type m_mutex;
134  condition_type m_emit_condition;
135  condition_type m_ready_condition;
136 
139  bool m_ready;
140  bool m_join;
141 
142  boost::thread *m_thread;
143 
144  submitter(const submitter &);
145  submitter &operator=(const submitter &);
146 
147 
148  protected:
149 
152  virtual void run()
153  {
154  lock_type lock(m_mutex);
155  m_ready=true;
156  m_ready_condition.notify_one();
157 
158  while(!m_join)
159  {
160  m_emit_condition.wait(lock);
161  if(!m_join)
162  m_sigdata.emit(m_buf);
163 
164  m_ready=true;
165  m_ready_condition.notify_one();
166  }
167  }
168 
169  public:
170 
171  submitter()
172  :shore::thread("mpi_submitter"),
173  m_ready(false),
174  m_join(false),
175  m_thread(0)
176  {}
177 
178  ~submitter()
179  {
180  delete m_thread;
181  }
182 
183  void join()
184  {
185  if(m_thread!=0)
186  {
187  {
188  lock_type lock(m_mutex);
189  while(!m_ready)
190  m_ready_condition.wait(lock);
191  m_join=true;
192  m_emit_condition.notify_one();
193  }
194  m_thread->join();
195  if(status())
196  throw std::runtime_error(errors());
197  }
198  }
199 
200  void submit(const T &d)
201  {
202  lock_type lock(mpi_proxy::instance().mutex());
203  shore::mpi::comm_send(0,get_tag(),d);
204  }
205 
206  virtual void rootaction(const int src)
207  {
208 #ifdef SHORE_MPI_NO_COMMTHREADS
209  shore::mpi::comm_recv(src,get_tag(),m_buf);
210  m_source=src;
211  m_sigdata.emit(m_buf);
212 
213 #else // !SHORE_MPI_NO_COMMTHREADS
214 
215  lock_type lock(m_mutex);
216 
217  if(m_thread==0)
218  m_thread=new boost::thread(boost::ref(*this));
219 
220  while(!m_ready)
221  m_ready_condition.wait(lock);
222 
223  if(status())
224  throw std::runtime_error(errors());
225 
226  shore::mpi::comm_recv(src,get_tag(),m_buf);
227  m_source=src;
228  m_ready=false;
229  m_emit_condition.notify_one();
230 
231 #endif // !SHORE_MPI_NO_COMMTHREADS
232  }
233 
234  shore::signal<const T&>& sigdata()
235  {
236  return m_sigdata;
237  }
238 
239  int get_source() const
240  {
241  return m_source;
242  }
243  };
244 
246  template<typename T>
247  class requester
248  :public communicator_base,
249  public shore::thread
250  {
251  private:
252 
253  typedef boost::recursive_mutex mutex_type;
254  typedef boost::recursive_mutex::scoped_lock lock_type;
255  typedef boost::condition condition_type;
256 
257  T m_rootbuf;
258  int m_target;
259 
260  shore::signal<T &> m_siggetdata;
261 
262 
263  mutex_type m_mutex;
264  condition_type m_emit_condition;
265  condition_type m_ready_condition;
266 
269  bool m_ready;
270  bool m_join;
271 
272  boost::thread *m_thread;
273 
274 
275  requester(const requester &);
276  requester &operator=(const requester &);
277 
278  protected:
279 
282  virtual void run()
283  {
284  lock_type lock(m_mutex);
285  m_ready=true;
286  m_ready_condition.notify_one();
287 
288  for(;;)
289  {
290  m_emit_condition.wait(lock);
291 
292  if(m_join)
293  break;
294 
295  m_siggetdata.emit(m_rootbuf);
296 
297  // will send while comm_probe() is
298  // potentially running, there maybe
299  // problems if MPI_THREAD_MULTIPLE is
300  // unsupported
301  shore::mpi::comm_send(m_target,get_tag(),m_rootbuf);
302 
303  m_ready=true;
304 
305  m_ready_condition.notify_one();
306  }
307  }
308 
309  public:
310 
311  requester()
312  :shore::thread("mpi_requester"),
313  m_ready(false),
314  m_join(false),
315  m_thread(0)
316  {}
317 
318  ~requester()
319  {
320  delete m_thread;
321  }
322 
323  void join()
324  {
325  if(m_thread!=0)
326  {
327  {
328  lock_type lock(m_mutex);
329  while(!m_ready)
330  m_ready_condition.wait(lock);
331  m_join=true;
332  m_emit_condition.notify_one();
333  }
334  m_thread->join();
335  if(status())
336  throw std::runtime_error(errors());
337  }
338  unsubscribe();
339  }
340 
341  void request(T &result)
342  {
343  lock_type lock(mpi_proxy::instance().mutex());
344  shore::mpi::comm_send(0,get_tag());
345  shore::mpi::comm_recv(0,get_tag(),result);
346  }
347 
348  virtual void rootaction(const int trg)
349  {
350 #ifdef SHORE_MPI_NO_COMMTHREADS
351  shore::mpi::comm_recv(trg,get_tag());
352  m_target=trg;
353  m_siggetdata.emit(m_rootbuf);
354  shore::mpi::comm_send(trg,get_tag(),m_rootbuf);
355 
356 #else // !SHORE_MPI_NO_COMMTHREADS
357 
358  lock_type lock(m_mutex);
359 
360  if(m_thread==0)
361  m_thread=new boost::thread(boost::ref(*this));
362 
363  while(!m_ready)
364  m_ready_condition.wait(lock);
365 
366  if(status())
367  throw std::runtime_error(errors());
368 
369  shore::mpi::comm_recv(trg,get_tag());
370  m_target=trg;
371  m_ready=false;
372  m_emit_condition.notify_one();
373 
374 #endif // !SHORE_MPI_NO_COMMTHREADS
375  }
376 
377  shore::signal<T&>& siggetdata()
378  {
379  return m_siggetdata;
380  }
381 
382  const int get_target() const
383  {
384  return m_target;
385  }
386  };
387 
388  private:
389 
390  static mpi_proxy* m_instance;
391 
392  bool m_ready;
393  condition_type m_ready_condition;
394  boost::thread *m_thread;
395 
396  std::map<int,communicator_base*> m_comap;
397  std::map<int,int> m_tagmap;
398 
399  int m_nexttag;
400 
401 
402  mpi_proxy();
403 
404  mpi_proxy(const mpi_proxy &);
405  mpi_proxy &operator=(const mpi_proxy &);
406 
407  public:
408 
409  ~mpi_proxy();
410 
411  static mpi_proxy &instance();
412 
413  void operator()();
414 
415  void request_thread();
416  void join_thread();
417 
418  mutex_type &mutex();
419 
420  void rootmsg(const std::string& msg);
421 };
422 
423 } // namespace
424 
425 #endif // WITH_MPI
426 
427 #endif // SHORE_PARALLEL_PROXY_HPP__
428