26 #ifndef SHORE_PROCESSING_MERGE_HPP__
27 #define SHORE_PROCESSING_MERGE_HPP__
37 template<
typename T,
typename Cmp>
52 bool operator()(channel*
const c1,channel*
const c2)
54 return cmp(c1->current(),c2->current());
58 std::vector<sink<channel,T>*> m_channels;
59 std::multiset<channel*,ccmp> m_actives;
69 friend class sink<channel,T>;
77 typename std::multiset<channel*,ccmp>::iterator m_where;
81 channel(
merge*
const parent)
82 :m_channelnumber(parent->m_channels.size()),
83 m_parent(parent),m_sink(0),m_inmap(
false),
84 m_where(m_parent->m_actives.begin()),
88 channel(
const channel&);
95 int channelnumber()
const
97 return m_channelnumber;
100 const T& current()
const
108 m_sink->sigthaw().
emit();
116 m_parent->m_actives.erase(m_where);
119 m_sink->sigthaw().emit();
122 void append(
const T& v)
125 m_parent->m_actives.erase(m_where);
127 m_where=m_parent->m_actives.insert(
this);
129 m_sink->sigfreeze().emit();
136 typedef T current_type;
144 while(!m_channels.empty())
146 delete m_channels.back();
147 m_channels.pop_back();
154 (*m_channels.back())->set_sink(m_channels.back());
155 return *m_channels.back();
158 const int current_id()
const
160 return (*m_actives.begin())->channelnumber();
163 const current_type& current()
const
165 return (*m_actives.begin())->current();
168 bool has_data()
const
170 if(m_actives.empty())
172 for(
size_t i=0;i<m_channels.size();++i)
174 (*m_channels[i])->has_data();
177 return !m_actives.empty();
182 (*m_actives.begin())->next();
188 #endif // SHORE_PROCESSING_MERGE_HPP__