SHORE API
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
pipeline.hpp
Go to the documentation of this file.
1 
2 /*
3  * Copyright 2008,2009,2010,2011,2012 Stephan Ossowski, Korbinian Schneeberger,
4  * Felix Ott, Joerg Hagmann, Alf Scotland, Sebastian Bender
5  *
6  * This file is part of SHORE.
7  *
8  * SHORE is free software: you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation, either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * SHORE is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with SHORE. If not, see <http://www.gnu.org/licenses/>.
20  */
21 
26 
27 #ifndef SHORE_PROCESSING_PIPELINE_HPP__
28 #define SHORE_PROCESSING_PIPELINE_HPP__
29 
30 #include <map>
31 #include <set>
32 #include <string>
33 #include <vector>
34 
35 #include <boost/bind.hpp>
36 #include <boost/function.hpp>
37 #include <boost/utility/enable_if.hpp>
38 
39 
40 #include "shore/base/util.hpp"
43 
44 
45 namespace shore {
46 
48 template<typename A,typename B>
49 B &operator|(A &a,B &b)
50 {
51  a.sigdata().connect(b.slotdata());
52  a.sigflush().connect(b.slotflush());
53  b.sigfreeze().connect(a.slotfreeze());
54  b.sigthaw().connect(a.slotthaw());
55  return b;
56 }
57 
60 template<class T>
62 {
63  typedef typename T::current_type current_type;
64 };
65 
66 template<class T>
67 struct sink_traits
68 {
71  typedef typename T::append_type append_type;
72 };
73 
80 {
81  private:
82 
84  template<typename Derived,typename InputType,typename OutputType,
85  bool UNCHECKED_PIPE>
86  friend class pipe_facade;
87 
89  template<typename S,typename T>
90  friend class sink;
91 
93  template<typename Chnaf,typename S,typename T>
94  friend class pipe;
95 
96 
99 
100 
101  template<typename T>
102  static void next(T &p,
103  typename boost::enable_if<shore::has_next<T> >::type * =0)
104  {
105  p.next();
106  }
107 
108  template<typename T>
109  static void next(T &p,
110  typename boost::disable_if<shore::has_next<T> >::type * =0)
111  {}
112 
113  template<typename T,typename U>
114  static void prepare(T &p,const U &d,
115  typename boost::enable_if<shore::has_prepare<T> >::type * =0)
116  {
117  p.prepare(d);
118  }
119 
120  template<typename T,typename U>
121  static void prepare(T &p,const U &d,
122  typename boost::disable_if<shore::has_prepare<T> >::type * =0)
123  {}
124 
125  template<typename T,typename U>
126  static void append(T &p,const U &d)
127  {
128  p.append(d);
129  }
130 
131  template<typename T>
132  static void flush(T &p,
133  typename boost::enable_if<shore::has_flush<T> >::type * =0)
134  {
135  p.flush();
136  }
137 
138  template<typename T>
139  static void flush(T &p,
140  typename boost::disable_if<shore::has_flush<T> >::type * =0)
141  {}
142 };
143 
147 template<typename S,typename T=typename source_traits<S>::current_type>
148 class source
149 {
150  public:
151 
153  typedef S source_type;
154 
156  typedef T current_type;
157 
158  private:
159 
160  typedef source<S,T> this_type;
161 
163  source_type m_src;
164 
166  signal<const current_type&> m_sigdata;
167 
170  signal<void> m_sigflush;
171 
175  bool m_flush;
176 
177  bool m_freeze;
178  bool m_frozen;
179 
180  slot<void> m_slotfreeze;
181  slot<void> m_slotthaw;
182 
183 
185  void slotfun_freeze()
186  {
187  if(!m_frozen)
188  m_freeze=true;
189  }
190 
192  void slotfun_thaw()
193  {
194  if(m_frozen)
195  {
196  m_src.next();
197  m_frozen=false;
198  }
199  else if(m_freeze)
200  {
201  m_freeze=false;
202  return;
203  }
204  dump();
205  }
206 
207  public:
208 
209  source()
210  :m_flush(true),m_freeze(false),m_frozen(false),
211  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
212  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
213  {}
214 
215  template<typename Arg1>
216  source(Arg1 arg1)
217  :m_src(arg1),m_flush(true),
218  m_freeze(false),m_frozen(false),
219  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
220  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
221  {}
222 
223  template<typename Arg1,typename Arg2>
224  source(Arg1 arg1,Arg2 arg2)
225  :m_src(arg1,arg2),m_flush(true),
226  m_freeze(false),m_frozen(false),
227  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
228  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
229  {}
230 
231  template<typename Arg1,typename Arg2,typename Arg3>
232  source(Arg1 arg1,Arg2 arg2,Arg3 arg3)
233  :m_src(arg1,arg2,arg3),m_flush(true),
234  m_freeze(false),m_frozen(false),
235  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
236  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
237  {}
238 
239  template<typename Arg1,typename Arg2,typename Arg3,typename Arg4>
240  source(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4)
241  :m_src(arg1,arg2,arg3,arg4),m_flush(true),
242  m_freeze(false),m_frozen(false),
243  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
244  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
245  {}
246 
247  template<typename Arg1,typename Arg2,typename Arg3,typename Arg4,typename Arg5>
248  source(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5)
249  :m_src(arg1,arg2,arg3,arg4,arg5),m_flush(true),
250  m_freeze(false),m_frozen(false),
251  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
252  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
253  {}
254 
255  template<typename Arg1,typename Arg2,typename Arg3,typename Arg4,typename Arg5,typename Arg6>
256  source(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5,Arg6 arg6)
257  :m_src(arg1,arg2,arg3,arg4,arg5,arg6),m_flush(true),
258  m_freeze(false),m_frozen(false),
259  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
260  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
261  {}
262 
264  source(const source& t)
265  :m_flush(t.m_flush),m_freeze(t.m_freeze),m_frozen(t.m_frozen),
266  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
267  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
268  {
269  m_slotfreeze.copy_connections(t.m_slotfreeze);
270  m_slotthaw.copy_connections(t.m_slotthaw);
271  }
272 
276  void set_flush(const bool f)
277  {
278  m_flush=f;
279  }
280 
283  {
284  return &m_src;
285  }
286 
289  {
290  return m_src;
291  }
292 
294  const source_type* operator->() const
295  {
296  return &m_src;
297  }
298 
300  const source_type& operator*() const
301  {
302  return m_src;
303  }
304 
308  {
309  return m_sigdata;
310  }
311 
312 
316  {
317  return m_sigflush;
318  }
319 
323  {
324  return m_slotfreeze;
325  }
326 
330  {
331  return m_slotthaw;
332  }
333 
336  void dump()
337  {
338  while(m_src.has_data())
339  {
340  m_sigdata.emit(m_src.current());
341 
342  // Downstream elements have indicated that they are
343  // unable to process further elements, set to frozen
344  // state and abort.
345  if(m_freeze)
346  {
347  m_frozen=true;
348  m_freeze=false;
349  return;
350  }
351 
352  m_src.next();
353  }
354  // Flush unless it was requested not to.
355  if(m_flush)
356  m_sigflush.emit();
357  }
358 };
359 
363 template<typename S,typename T=typename sink_traits<S>::append_type>
364 class sink
365 {
366  public:
367 
368  typedef S sink_type;
369  typedef T append_type;
370 
371  private:
372 
373  typedef sink<S,T> this_type;
374 
375  sink_type *m_sink;
376 
377  slot<const append_type&> m_slotdata;
378  slot<void> m_slotflush;
379  signal<void> m_sigfreeze;
380  signal<void> m_sigthaw;
381 
382  size_t m_nflushs;
383 
384  bool m_ownsink;
385 
386 
387  void slotfun_flush()
388  {
389  // only flush after the last incoming connection sends the flush signal
390  ++m_nflushs;
391  if(m_nflushs==m_slotflush.nsignals())
392  {
393  m_nflushs=0;
394  pipeline_core_access::flush(*m_sink);
395  }
396  }
397 
398  public:
399 
400  sink()
401  :m_sink(new sink_type),
402  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
403  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
404  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
405  m_nflushs(0),m_ownsink(true)
406  {}
407 
408  template<typename Arg1>
409  sink(Arg1 arg1)
410  :m_sink(new sink_type(arg1)),
411  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
412  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
413  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
414  m_nflushs(0),m_ownsink(true)
415  {}
416 
417  template<typename Arg1,typename Arg2>
418  sink(Arg1 arg1,Arg2 arg2)
419  :m_sink(new sink_type(arg1,arg2)),
420  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
421  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
422  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
423  m_nflushs(0),m_ownsink(true)
424  {}
425 
426  template<typename Arg1,typename Arg2,typename Arg3>
427  sink(Arg1 arg1,Arg2 arg2,Arg3 arg3)
428  :m_sink(new sink_type(arg1,arg2,arg3)),
429  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
430  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
431  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
432  m_nflushs(0),m_ownsink(true)
433  {}
434 
435  template<typename Arg1,typename Arg2,typename Arg3,typename Arg4>
436  sink(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4)
437  :m_sink(new sink_type(arg1,arg2,arg3,arg4)),
438  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
439  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
440  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
441  m_nflushs(0),m_ownsink(true)
442  {}
443 
444  template<typename Arg1,typename Arg2,typename Arg3,typename Arg4,typename Arg5>
445  sink(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5)
446  :m_sink(new sink_type(arg1,arg2,arg3,arg4,arg5)),
447  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
448  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
449  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
450  m_nflushs(0),m_ownsink(true)
451  {}
452 
453  template<typename Arg1,typename Arg2,typename Arg3,typename Arg4,typename Arg5,typename Arg6>
454  sink(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5,Arg6 arg6)
455  :m_sink(new sink_type(arg1,arg2,arg3,arg4,arg5,arg6)),
456  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
457  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
458  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
459  m_nflushs(0),m_ownsink(true)
460  {}
461 
463  sink(const sink& t)
464  :m_sink(new sink_type(*t.m_sink)),
465  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
466  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
467  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
468  m_nflushs(t.m_nflushs),m_ownsink(true)
469  {
470  m_slotdata.copy_connections(t.m_slotdata);
471  m_slotflush.copy_connections(t.m_slotflush);
472  }
473 
474  sink(sink_type *const s)
475  :m_sink(s),
476  //m_slotdata(boost::bind(static_cast<void(sink_type::*)(const append_type&)>(&sink_type::append),m_sink,_1)),
477  m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
478  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
479  m_nflushs(0),m_ownsink(false)
480  {}
481 
482  ~sink()
483  {
484  if(m_ownsink)
485  delete m_sink;
486  }
487 
488  sink_type* operator->()
489  {
490  return m_sink;
491  }
492 
493  sink_type& operator*()
494  {
495  return *m_sink;
496  }
497 
498  const sink_type* operator->() const
499  {
500  return m_sink;
501  }
502 
503  const sink_type& operator*() const
504  {
505  return *m_sink;
506  }
507 
508  slot<const append_type&>& slotdata()
509  {
510  return m_slotdata;
511  }
512 
513  slot<void>& slotflush()
514  {
515  return m_slotflush;
516  }
517 
518  // never emitted automatically
519  signal<void>& sigfreeze()
520  {
521  return m_sigfreeze;
522  }
523 
524  // never emitted automatically
525  signal<void>& sigthaw()
526  {
527  return m_sigthaw;
528  }
529 };
530 
535 template<typename Chnaf,
536  typename S=typename source_traits<Chnaf>::current_type,
537  typename T=typename sink_traits<Chnaf>::append_type>
538 class pipe
539 {
540  public:
541 
542  typedef Chnaf pipe_type;
543  typedef S current_type;
544  typedef T append_type;
545 
546  private:
547 
549 
550  pipe_type m_chnaf;
551 
552  signal<const current_type&> m_sigdata;
553  signal<void> m_sigflush;
554 
555  slot<const append_type&> m_slotdata;
556  slot<void> m_slotflush;
557 
558  slot<void> m_slotfreeze;
559  slot<void> m_slotthaw;
560  signal<void> m_sigfreeze;
561  signal<void> m_sigthaw;
562 
563  size_t m_nflushs;
564 
565  bool m_freeze;
566  bool m_frozen;
567 
568 
569  void slotfun_freeze()
570  {
571  if(!m_frozen)
572  {
573  m_freeze=true;
574  m_sigfreeze.emit();
575  }
576  }
577 
578  void slotfun_thaw()
579  {
580  if(m_frozen)
581  {
582  m_chnaf.next();
583  m_frozen=false;
584  }
585  else if(m_freeze)
586  {
587  m_freeze=false;
588  return;
589  }
590 
591  propagate();
592 
593  // if not frozen, get more data
594  if(!m_chnaf.has_data())
595  m_sigthaw.emit();
596  }
597 
598  void propagate()
599  {
600  while(m_chnaf.has_data())
601  {
602  m_sigdata.emit(m_chnaf.current());
603 
604  if(m_freeze)
605  {
606  m_frozen=true;
607  m_freeze=false;
608  return;
609  }
610 
611  m_chnaf.next();
612  }
613  if(m_nflushs==m_slotflush.nsignals())
614  {
615  m_sigflush.emit();
616  m_nflushs=0;
617  }
618  }
619 
620  void slotfun_append(const append_type& f)
621  {
622  m_chnaf.append(f);
623  propagate();
624  }
625 
626  void slotfun_flush()
627  {
628  ++m_nflushs;
629  if(m_nflushs==m_slotflush.nsignals())
630  {
631  pipeline_core_access::flush(m_chnaf);
632  propagate();
633  }
634  }
635 
636  public:
637 
638  pipe()
639  :m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
640  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
641  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
642  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
643  m_nflushs(0),m_freeze(false),m_frozen(false)
644  {}
645 
646  template<typename Arg1>
647  pipe(Arg1 arg1)
648  :m_chnaf(arg1),
649  m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
650  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
651  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
652  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
653  m_nflushs(0),m_freeze(false),m_frozen(false)
654  {}
655 
656  template<typename Arg1,typename Arg2>
657  pipe(Arg1 arg1,Arg2 arg2)
658  :m_chnaf(arg1,arg2),
659  m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
660  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
661  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
662  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
663  m_nflushs(0),m_freeze(false),m_frozen(false)
664  {}
665 
666  template<typename Arg1,typename Arg2,typename Arg3>
667  pipe(Arg1 arg1,Arg2 arg2,Arg3 arg3)
668  :m_chnaf(arg1,arg2,arg3),
669  m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
670  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
671  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
672  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
673  m_nflushs(0),m_freeze(false),m_frozen(false)
674  {}
675 
676  template<typename Arg1,typename Arg2,typename Arg3,typename Arg4>
677  pipe(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4)
678  :m_chnaf(arg1,arg2,arg3,arg4),
679  m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
680  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
681  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
682  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
683  m_nflushs(0),m_freeze(false),m_frozen(false)
684  {}
685 
686  template<typename Arg1,typename Arg2,typename Arg3,typename Arg4,typename Arg5>
687  pipe(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5)
688  :m_chnaf(arg1,arg2,arg3,arg4,arg5),
689  m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
690  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
691  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
692  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
693  m_nflushs(0),m_freeze(false),m_frozen(false)
694  {}
695 
697  pipe(const pipe& t)
698  :m_chnaf(t.m_chnaf),
699  m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
700  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
701  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
702  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
703  m_nflushs(t.m_nflushs),m_freeze(false),m_frozen(false)
704  {
705  m_slotdata.copy_connections(t.m_slotdata);
706  m_slotflush.copy_connections(t.m_slotflush);
707  m_slotfreeze.copy_connections(t.m_slotfreeze);
708  m_slotthaw.copy_connections(t.m_slotthaw);
709  }
710 
711  pipe_type* operator->()
712  {
713  return &m_chnaf;
714  }
715 
716  pipe_type& operator*()
717  {
718  return m_chnaf;
719  }
720 
721  const pipe_type* operator->() const
722  {
723  return &m_chnaf;
724  }
725 
726  const pipe_type& operator*() const
727  {
728  return m_chnaf;
729  }
730 
731  signal<const current_type&>& sigdata()
732  {
733  return m_sigdata;
734  }
735 
736  signal<void>& sigflush()
737  {
738  return m_sigflush;
739  }
740 
741  slot<const append_type&>& slotdata()
742  {
743  return m_slotdata;
744  }
745 
746  slot<void>& slotflush()
747  {
748  return m_slotflush;
749  }
750 
751  slot<void>& slotfreeze()
752  {
753  return m_slotfreeze;
754  }
755 
756  slot<void>& slotthaw()
757  {
758  return m_slotthaw;
759  }
760 
761  signal<void>& sigfreeze()
762  {
763  return m_sigfreeze;
764  }
765 
766  signal<void>& sigthaw()
767  {
768  return m_sigthaw;
769  }
770 };
771 
787 template<typename Derived,typename InputType,typename OutputType,
788  bool UNCHECKED_PIPE=false>
790 {
791  public:
792 
793  typedef InputType append_type;
794  typedef OutputType current_type;
795 
796  protected:
797 
803  void emit(const OutputType &d)
804  {
805  if(m_frozen)
806  throw_exception(std::logic_error("pipe_facade::emit: overflow"));
807  if(!(UNCHECKED_PIPE||m_can_emit))
808  throw_exception(std::logic_error("pipe_facade: invalid or multiple"
809  " emit() detected - emit() may be"
810  " called only once in each of"
811  " prepare(), append(),"
812  " next(), flush()"));
813 
814  m_sigdata.emit(d);
815  m_can_emit=false;
816 
817  if(m_freeze)
818  {
819  m_frozen=true;
820  m_freeze=false;
821  }
822  // limit recursion level to 1
823  else if(m_recursion_count==0)
824  {
825  // downstream processing has completed, free output buffer;
826  // may emit recursively until all data is sent
827  do
828  {
829  m_have_emitted=false;
830  ++m_recursion_count;
831 
832  m_can_emit=true;
833  // Subclasses may implement next() to discard
834  // the current element. The next() function may
835  // itself call emit() if more data are available.
836  pipeline_core_access::next(*derived());
837  m_can_emit=false;
838 
839  --m_recursion_count;
840  }
841  while(m_have_emitted)
842  ;
843  }
844  // flag a recursive emit -> will trigger next() at level 0
845  else
846  m_have_emitted=true;
847  }
848 
850 
851  private:
852 
854 
855  const InputType *m_pending_append;
856 
858  size_t m_recursion_count;
860  bool m_have_emitted;
862  bool m_can_emit;
863 
864  signal<const OutputType &> m_sigdata;
865  signal<void> m_sigflush;
866 
867  slot<const InputType &> m_slotdata;
868  slot<void> m_slotflush;
869 
870  slot<void> m_slotfreeze;
871  slot<void> m_slotthaw;
873  signal<void> m_sigfreeze;
875  signal<void> m_sigthaw;
876 
877  bool m_freeze;
878  bool m_frozen;
879  size_t m_numflushs;
880  bool m_flushed;
881 
882 
883  Derived *derived()
884  {
885  return static_cast<Derived *>(this);
886  }
887 
888  void slotfun_append(const InputType &d)
889  {
890  if(m_frozen)
891  throw_exception(std::logic_error("pipe_facade::slotfun_append: overflow"));
892 
893  // generate all output that must be processed downstream before
894  // appending the item
895  m_can_emit=true;
896  // Subclasses may implement prepare() to emit data before a new
897  // element is appended.
898  pipeline_core_access::prepare(*derived(),d);
899  m_can_emit=false;
900 
901  if(!m_frozen)
902  {
903  m_can_emit=true;
904  // Subclasses implement append() to handle appended data.
905  pipeline_core_access::append(*derived(),d);
906  m_can_emit=false;
907  }
908  else
909  m_pending_append=&d;
910  }
911 
912  void slotfun_flush()
913  {
914  if(m_frozen)
915  throw_exception(std::logic_error("pipe_facade::slotfun_flush: overflow"));
916 
917  ++m_numflushs;
918  if(m_numflushs==m_slotflush.nsignals())
919  {
920  m_can_emit=true;
921  // Subclasses may implement flush() to emit remaining
922  // data when no more input is available.
923  pipeline_core_access::flush(*derived());
924  m_can_emit=false;
925 
926  m_flushed=true;
927  if(!m_frozen)
928  m_sigflush.emit();
929  }
930  }
931 
933  void slotfun_freeze()
934  {
935  // pipeline may already be frozen up to this element
936  if(!m_frozen)
937  {
938  m_freeze=true;
939  m_sigfreeze.emit();
940  }
941  }
942 
945  void slotfun_thaw()
946  {
947  if(m_freeze)
948  m_freeze=false;
949  else if(m_frozen)
950  {
951  m_frozen=false;
952 
953  // free output buffer
954  m_can_emit=true;
955  pipeline_core_access::next(*derived());
956  m_can_emit=false;
957 
958  if(!m_frozen)
959  {
960  // resume append
961  if(m_pending_append!=0)
962  {
963  const InputType &p=*m_pending_append;
964  m_pending_append=0;
965 
966  m_can_emit=true;
967  pipeline_core_access::append(*derived(),p);
968  m_can_emit=false;
969 
970  if(!m_frozen)
971  m_sigthaw.emit();
972  }
973  // resume flush
974  else if(m_flushed)
975  m_sigflush.emit();
976  else
977  m_sigthaw.emit();
978  }
979  }
980  else
981  m_sigthaw.emit();
982  }
983 
984  public:
985 
986  pipe_facade()
987  :m_pending_append(0),m_recursion_count(0),m_have_emitted(false),
988  m_can_emit(false),
989  m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
990  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
991  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
992  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
993  m_freeze(false),m_frozen(false),m_numflushs(0),m_flushed(false)
994  {}
995 
998  :m_pending_append(0),m_recursion_count(0),m_have_emitted(false),
999  m_can_emit(false),
1000  m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
1001  m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
1002  m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
1003  m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
1004  m_freeze(false),m_frozen(false),m_numflushs(t.m_numflushs),m_flushed(t.m_flushed)
1005  {
1006  m_slotdata.copy_connections(t.m_slotdata);
1007  m_slotflush.copy_connections(t.m_slotflush);
1008  m_slotfreeze.copy_connections(t.m_slotfreeze);
1009  m_slotthaw.copy_connections(t.m_slotthaw);
1010  }
1011 
1014  {
1015  m_slotdata.disconnect_all();
1016  m_slotflush.disconnect_all();
1017  m_sigfreeze.disconnect_all();
1018  m_sigthaw.disconnect_all();
1019  }
1020 
1023  {
1024  m_sigdata.disconnect_all();
1025  m_sigflush.disconnect_all();
1026  m_slotfreeze.disconnect_all();
1027  m_slotflush.disconnect_all();
1028  }
1029 
1030  signal<const OutputType &> &sigdata()
1031  {
1032  return m_sigdata;
1033  }
1034 
1035  signal<void> &sigflush()
1036  {
1037  return m_sigflush;
1038  }
1039 
1040  slot<const InputType &> &slotdata()
1041  {
1042  return m_slotdata;
1043  }
1044 
1045  slot<void> &slotflush()
1046  {
1047  return m_slotflush;
1048  }
1049 
1050  slot<void> &slotfreeze()
1051  {
1052  return m_slotfreeze;
1053  }
1054 
1055  slot<void> &slotthaw()
1056  {
1057  return m_slotthaw;
1058  }
1059 
1060  signal<void> &sigfreeze()
1061  {
1062  return m_sigfreeze;
1063  }
1064 
1065  signal<void> &sigthaw()
1066  {
1067  return m_sigthaw;
1068  }
1069 };
1070 
1071 } // namespace shore
1072 
1073 #endif // SHORE_PROCESSING_PIPELINE_HPP__
1074