SHORE API
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
merge.hpp
Go to the documentation of this file.
1 
2 /*
3  * Copyright 2008,2009,2010,2011,2012 Stephan Ossowski, Korbinian Schneeberger,
4  * Felix Ott, Joerg Hagmann, Alf Scotland, Sebastian Bender
5  *
6  * This file is part of SHORE.
7  *
8  * SHORE is free software: you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation, either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * SHORE is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with SHORE. If not, see <http://www.gnu.org/licenses/>.
20  */
21 
25 
26 #ifndef SHORE_PROCESSING_MERGE_HPP__
27 #define SHORE_PROCESSING_MERGE_HPP__
28 
29 #include <set>
30 #include <vector>
31 
32 #include "pipeline.hpp"
33 
34 namespace shore {
35 
37 template<typename T,typename Cmp>
38 class merge
39 {
40  public:
41 
42  class channel;
43 
44  private:
45 
46  struct ccmp
47  {
48  Cmp cmp;
49 
50  ccmp(Cmp c):cmp(c) {}
51 
52  bool operator()(channel*const c1,channel*const c2)
53  {
54  return cmp(c1->current(),c2->current());
55  }
56  };
57 
58  std::vector<sink<channel,T>*> m_channels;
59  std::multiset<channel*,ccmp> m_actives;
60 
61  public:
62 
63  class channel
64  {
65  private:
66 
67  friend class shore::pipeline_core_access;
68  friend class merge;
69  friend class sink<channel,T>;
70 
71  int m_channelnumber;
72 
73  merge* m_parent;
74  sink<channel,T>* m_sink;
75 
76  bool m_inmap;
77  typename std::multiset<channel*,ccmp>::iterator m_where;
78 
79  const T *m_data;
80 
81  channel(merge*const parent)
82  :m_channelnumber(parent->m_channels.size()),
83  m_parent(parent),m_sink(0),m_inmap(false),
84  m_where(m_parent->m_actives.begin()),
85  m_data(0)
86  {}
87 
88  channel(const channel&);
89 
90  void set_sink(sink<channel,T>*const s)
91  {
92  m_sink=s;
93  }
94 
95  int channelnumber() const
96  {
97  return m_channelnumber;
98  }
99 
100  const T& current() const
101  {
102  return *m_data;
103  }
104 
105  bool has_data()
106  {
107  if(!m_inmap)
108  m_sink->sigthaw().emit();
109  return m_inmap;
110  }
111 
112  void next()
113  {
114  if(m_inmap)
115  {
116  m_parent->m_actives.erase(m_where);
117  m_inmap=false;
118  }
119  m_sink->sigthaw().emit();
120  }
121 
122  void append(const T& v)
123  {
124  if(m_inmap)
125  m_parent->m_actives.erase(m_where);
126  m_data=&v;
127  m_where=m_parent->m_actives.insert(this);
128  m_inmap=true;
129  m_sink->sigfreeze().emit();
130  }
131 
132  void flush()
133  {}
134  };
135 
136  typedef T current_type;
137 
138  merge(Cmp c=Cmp())
139  :m_actives(ccmp(c))
140  {}
141 
142  ~merge()
143  {
144  while(!m_channels.empty())
145  {
146  delete m_channels.back();
147  m_channels.pop_back();
148  }
149  }
150 
151  sink<channel,T>& make_channel()
152  {
153  m_channels.push_back(new sink<channel,T>(this));
154  (*m_channels.back())->set_sink(m_channels.back());
155  return *m_channels.back();
156  }
157 
158  const int current_id() const
159  {
160  return (*m_actives.begin())->channelnumber();
161  }
162 
163  const current_type& current() const
164  {
165  return (*m_actives.begin())->current();
166  }
167 
168  bool has_data() const
169  {
170  if(m_actives.empty())
171  {
172  for(size_t i=0;i<m_channels.size();++i)
173  {
174  (*m_channels[i])->has_data();
175  }
176  }
177  return !m_actives.empty();
178  }
179 
180  void next()
181  {
182  (*m_actives.begin())->next();
183  }
184 };
185 
186 } //namespace shore
187 
188 #endif // SHORE_PROCESSING_MERGE_HPP__
189