SHORE API
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
message_passing.hpp
Go to the documentation of this file.
1 
2 /*
3  * Copyright 2008,2009,2010,2011,2012 Stephan Ossowski, Korbinian Schneeberger,
4  * Felix Ott, Joerg Hagmann, Alf Scotland, Sebastian Bender
5  *
6  * This file is part of SHORE.
7  *
8  * SHORE is free software: you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation, either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * SHORE is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with SHORE. If not, see <http://www.gnu.org/licenses/>.
20  */
21 
25 
26 #ifndef SHORE_PROCESSING_MESSAGE_PASSING_HPP__
27 #define SHORE_PROCESSING_MESSAGE_PASSING_HPP__
28 
29 #include <algorithm>
30 #include <sstream>
31 #include <stdexcept>
32 #include <vector>
33 
34 #ifdef WITH_MPI
35  #define MPICH_IGNORE_CXX_SEEK 1
36  #include <mpi.h>
37  #include <boost/serialization/string.hpp>
38  #include <boost/archive/text_oarchive.hpp>
39  #include <boost/archive/text_iarchive.hpp>
41  #define SMPI_INFO(x) std::string(x)
42 #else
43  #define SMPI_INFO(x) std::string()
45 #endif
46 
47 namespace shore {
48 
50 class mpi
51 {
52  private:
53 
54  static mpi m_instance;
55 
56  class exception
57  :public std::runtime_error
58  {
59  public:
60 
61  exception(const std::string& label,const int)
62  :std::runtime_error("MPI error: "+label)
63  {}
64  };
65 
66  bool m_initialized;
67 
68  size_t m_comm_bufsize;
69  char* m_comm_buffer;
70 
71  mpi();
72  ~mpi();
73 
74  // no copy construction or assignment allowed
75  mpi(const mpi&);
76  mpi& operator=(const mpi&);
77 
78 
79  #ifdef WITH_MPI
80 
81  void ensure_bufsize(const size_t size);
82 
83  template<typename T>
84  size_t serialize(const T& what)
85  {
86  std::ostringstream str;
87  {
88  boost::archive::text_oarchive arch(str);
89  arch<<what;
90  }
91 
92  const std::string serial=str.str();
93  //if(serial.size()>=m_instance.m_comm_bufsize) throw std::runtime_error(
94  // "mpi::serialize: message size limit exceeded");
95  m_instance.ensure_bufsize(serial.size()+1);
96 
97  std::copy(serial.begin(),serial.end(),m_instance.m_comm_buffer);
98 
99  // send around zero terminated strings, since
100  // MPI_Status::count does not seem to be standard
101  m_instance.m_comm_buffer[serial.size()]=0;
102 
103  return serial.size()+1;
104  }
105 
106  template<typename T>
107  size_t deserialize(T& what,const size_t offset=0)
108  {
109  const std::string serial(m_comm_buffer+offset);
110 
111  std::istringstream str(serial);
112  {
113  boost::archive::text_iarchive arch(str);
114  arch>>what;
115  }
116 
117  return serial.size()+1;
118  }
119  #endif // WITH_MPI
120 
121  public:
122 
123  class status
124  {
125  private:
126 
127  friend class mpi;
128 
129  status();
130 
131  public:
132 
133  int source;
134  int tag;
135  };
136 
138  static void no_mpi(const std::string& name);
139 
140  static void start(int ac,char**av);
141  static void stop();
142 
143 
144  static int env_maxtag();
145 
146  static size_t comm_size();
147  static size_t comm_rank();
148  static bool comm_isroot();
149 
150  static void comm_barrier();
151 
152  #ifdef WITH_MPI
153  static status comm_probe();
154 
155  static void comm_send(const int dest,const int tag);
156 
157  static status comm_recv(const int src,const int tag);
158 
159  static void comm_send(const int dest,const int tag,const int &what);
160 
161  static status comm_recv(const int src,const int tag,int &what);
162 
163  template<typename T>
164  static void comm_send(const int dest,const int tag,const T& what)
165  {
166  const int msgsize=m_instance.serialize(what);
167 
168  comm_send(dest,tag,msgsize);
169 
170  const int err=MPI_Ssend(m_instance.m_comm_buffer,
171  msgsize,MPI_CHAR,dest,tag,MPI_COMM_WORLD);
172 
173  if(err!=MPI_SUCCESS)
174  throw exception("MPI_Ssend (3)",err);
175  }
176 
177  template<typename T>
178  static status comm_recv(const int src,const int tag,T& what)
179  {
180  status ret;
181 
182  int msgsize=0;
183  ret=comm_recv(src,tag,msgsize);
184 
185  m_instance.ensure_bufsize(msgsize);
186 
187  MPI_Status stat;
188  const int err=MPI_Recv(m_instance.m_comm_buffer,
189  msgsize,MPI_CHAR,ret.source,ret.tag,
190  MPI_COMM_WORLD,&stat);
191 
192  if(err!=MPI_SUCCESS)
193  throw exception("MPI_Recv (3)",err);
194 
195  ret.source=stat.MPI_SOURCE;
196  ret.tag=stat.MPI_TAG;
197 
198  m_instance.deserialize(what);
199 
200  return ret;
201  }
202 
203  template<typename T>
204  static status comm_recv(const int src,T& what)
205  {
206  return comm_recv(src,MPI_ANY_TAG,what);
207  }
208 
209  template<typename T>
210  static status comm_recv(T& what)
211  {
212  return comm_recv(MPI_ANY_SOURCE,MPI_ANY_TAG,what);
213  }
214 
215  #endif // WITH_MPI
216 
217  static void comm_broadcast(int &what,int root=0);
218 
219  template<typename T>
220  static void comm_broadcast(T& what,int root=0)
221  {
222  #ifdef WITH_MPI
223 
224  int msgsize=0;
225  const size_t rnk=comm_rank();
226 
227  if(rnk==size_t(root))
228  msgsize=m_instance.serialize(what);
229 
230  // message size must be known to all processes
231  comm_broadcast(msgsize,root);
232 
233  if(rnk!=size_t(root)) m_instance.ensure_bufsize(msgsize);
234 
235  const int err=MPI_Bcast(m_instance.m_comm_buffer,
236  msgsize,MPI_CHAR,root,MPI_COMM_WORLD);
237 
238  if(err!=MPI_SUCCESS)
239  throw exception("MPI_Bcast (2)",err);
240 
241  if(rnk!=size_t(root))
242  m_instance.deserialize(what);
243  #endif // WITH_MPI
244  }
245 
246  static std::vector<int> comm_gather(const int &what,int root=0);
247 };
248 
249 } // namespace
250 
251 #endif // SHORE_PROCESSING_MESSAGE_PASSING_HPP__
252