26 #ifndef SHORE_PROCESSING_MESSAGE_PASSING_HPP__
27 #define SHORE_PROCESSING_MESSAGE_PASSING_HPP__
35 #define MPICH_IGNORE_CXX_SEEK 1
37 #include <boost/serialization/string.hpp>
38 #include <boost/archive/text_oarchive.hpp>
39 #include <boost/archive/text_iarchive.hpp>
41 #define SMPI_INFO(x) std::string(x)
43 #define SMPI_INFO(x) std::string()
54 static mpi m_instance;
57 :
public std::runtime_error
61 exception(
const std::string& label,
const int)
62 :std::runtime_error(
"MPI error: "+label)
68 size_t m_comm_bufsize;
76 mpi& operator=(
const mpi&);
81 void ensure_bufsize(
const size_t size);
86 std::ostringstream str;
88 boost::archive::text_oarchive arch(str);
92 const std::string
serial=str.str();
95 m_instance.ensure_bufsize(serial.size()+1);
97 std::copy(serial.begin(),serial.end(),m_instance.m_comm_buffer);
101 m_instance.m_comm_buffer[serial.size()]=0;
103 return serial.size()+1;
107 size_t deserialize(T& what,
const size_t offset=0)
109 const std::string
serial(m_comm_buffer+offset);
111 std::istringstream str(serial);
113 boost::archive::text_iarchive arch(str);
117 return serial.size()+1;
138 static void no_mpi(
const std::string& name);
140 static void start(
int ac,
char**av);
144 static int env_maxtag();
146 static size_t comm_size();
147 static size_t comm_rank();
148 static bool comm_isroot();
150 static void comm_barrier();
153 static status comm_probe();
155 static void comm_send(
const int dest,
const int tag);
157 static status comm_recv(
const int src,
const int tag);
159 static void comm_send(
const int dest,
const int tag,
const int &what);
161 static status comm_recv(
const int src,
const int tag,
int &what);
164 static void comm_send(
const int dest,
const int tag,
const T& what)
166 const int msgsize=m_instance.serialize(what);
168 comm_send(dest,tag,msgsize);
170 const int err=MPI_Ssend(m_instance.m_comm_buffer,
171 msgsize,MPI_CHAR,dest,tag,MPI_COMM_WORLD);
174 throw exception(
"MPI_Ssend (3)",err);
178 static status comm_recv(
const int src,
const int tag,T& what)
183 ret=comm_recv(src,tag,msgsize);
185 m_instance.ensure_bufsize(msgsize);
188 const int err=MPI_Recv(m_instance.m_comm_buffer,
189 msgsize,MPI_CHAR,ret.source,ret.tag,
190 MPI_COMM_WORLD,&stat);
193 throw exception(
"MPI_Recv (3)",err);
195 ret.source=stat.MPI_SOURCE;
196 ret.tag=stat.MPI_TAG;
198 m_instance.deserialize(what);
204 static status comm_recv(
const int src,T& what)
206 return comm_recv(src,MPI_ANY_TAG,what);
210 static status comm_recv(T& what)
212 return comm_recv(MPI_ANY_SOURCE,MPI_ANY_TAG,what);
217 static void comm_broadcast(
int &what,
int root=0);
220 static void comm_broadcast(T& what,
int root=0)
225 const size_t rnk=comm_rank();
227 if(rnk==
size_t(root))
228 msgsize=m_instance.serialize(what);
231 comm_broadcast(msgsize,root);
233 if(rnk!=
size_t(root)) m_instance.ensure_bufsize(msgsize);
235 const int err=MPI_Bcast(m_instance.m_comm_buffer,
236 msgsize,MPI_CHAR,root,MPI_COMM_WORLD);
239 throw exception(
"MPI_Bcast (2)",err);
241 if(rnk!=
size_t(root))
242 m_instance.deserialize(what);
246 static std::vector<int> comm_gather(
const int &what,
int root=0);
251 #endif // SHORE_PROCESSING_MESSAGE_PASSING_HPP__