27 #ifndef SHORE_PROCESSING_PIPELINE_HPP__
28 #define SHORE_PROCESSING_PIPELINE_HPP__
35 #include <boost/bind.hpp>
36 #include <boost/function.hpp>
37 #include <boost/utility/enable_if.hpp>
48 template<
typename A,
typename B>
51 a.sigdata().connect(b.slotdata());
52 a.sigflush().connect(b.slotflush());
53 b.sigfreeze().connect(a.slotfreeze());
54 b.sigthaw().connect(a.slotthaw());
63 typedef typename T::current_type current_type;
71 typedef typename T::append_type append_type;
84 template<
typename Derived,
typename InputType,
typename OutputType,
89 template<
typename S,
typename T>
93 template<
typename Chnaf,
typename S,
typename T>
102 static void next(T &p,
103 typename boost::enable_if<shore::has_next<T> >::type * =0)
109 static void next(T &p,
110 typename boost::disable_if<shore::has_next<T> >::type * =0)
113 template<
typename T,
typename U>
114 static void prepare(T &p,
const U &d,
115 typename boost::enable_if<shore::has_prepare<T> >::type * =0)
120 template<
typename T,
typename U>
121 static void prepare(T &p,
const U &d,
122 typename boost::disable_if<shore::has_prepare<T> >::type * =0)
125 template<
typename T,
typename U>
126 static void append(T &p,
const U &d)
132 static void flush(T &p,
133 typename boost::enable_if<shore::has_flush<T> >::type * =0)
139 static void flush(T &p,
140 typename boost::disable_if<shore::has_flush<T> >::type * =0)
147 template<typename S,typename T=typename source_traits<S>::current_type>
185 void slotfun_freeze()
210 :m_flush(true),m_freeze(false),m_frozen(false),
211 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
212 m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
215 template<
typename Arg1>
217 :m_src(arg1),m_flush(true),
218 m_freeze(false),m_frozen(false),
219 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
220 m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
223 template<
typename Arg1,
typename Arg2>
224 source(Arg1 arg1,Arg2 arg2)
225 :m_src(arg1,arg2),m_flush(true),
226 m_freeze(false),m_frozen(false),
227 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
228 m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
231 template<
typename Arg1,
typename Arg2,
typename Arg3>
232 source(Arg1 arg1,Arg2 arg2,Arg3 arg3)
233 :m_src(arg1,arg2,arg3),m_flush(true),
234 m_freeze(false),m_frozen(false),
235 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
236 m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
239 template<
typename Arg1,
typename Arg2,
typename Arg3,
typename Arg4>
240 source(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4)
241 :m_src(arg1,arg2,arg3,arg4),m_flush(true),
242 m_freeze(false),m_frozen(false),
243 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
244 m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
247 template<
typename Arg1,
typename Arg2,
typename Arg3,
typename Arg4,
typename Arg5>
248 source(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5)
249 :m_src(arg1,arg2,arg3,arg4,arg5),m_flush(true),
250 m_freeze(false),m_frozen(false),
251 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
252 m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
255 template<
typename Arg1,
typename Arg2,
typename Arg3,
typename Arg4,
typename Arg5,
typename Arg6>
256 source(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5,Arg6 arg6)
257 :m_src(arg1,arg2,arg3,arg4,arg5,arg6),m_flush(true),
258 m_freeze(false),m_frozen(false),
259 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
260 m_slotthaw(boost::bind(&this_type::slotfun_thaw,this))
265 :m_flush(t.m_flush),m_freeze(t.m_freeze),m_frozen(t.m_frozen),
266 m_slotfreeze(boost::bind(&
this_type::slotfun_freeze,this)),
267 m_slotthaw(boost::bind(&
this_type::slotfun_thaw,this))
338 while(m_src.has_data())
340 m_sigdata.
emit(m_src.current());
363 template<typename S,typename T=typename sink_traits<S>::append_type>
369 typedef T append_type;
391 if(m_nflushs==m_slotflush.
nsignals())
394 pipeline_core_access::flush(*m_sink);
401 :m_sink(
new sink_type),
403 m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
404 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
405 m_nflushs(0),m_ownsink(
true)
408 template<
typename Arg1>
410 :m_sink(
new sink_type(arg1)),
412 m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
413 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
414 m_nflushs(0),m_ownsink(
true)
417 template<
typename Arg1,
typename Arg2>
418 sink(Arg1 arg1,Arg2 arg2)
419 :m_sink(
new sink_type(arg1,arg2)),
421 m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
422 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
423 m_nflushs(0),m_ownsink(
true)
426 template<
typename Arg1,
typename Arg2,
typename Arg3>
427 sink(Arg1 arg1,Arg2 arg2,Arg3 arg3)
428 :m_sink(
new sink_type(arg1,arg2,arg3)),
430 m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
431 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
432 m_nflushs(0),m_ownsink(
true)
435 template<
typename Arg1,
typename Arg2,
typename Arg3,
typename Arg4>
436 sink(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4)
437 :m_sink(
new sink_type(arg1,arg2,arg3,arg4)),
439 m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
440 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
441 m_nflushs(0),m_ownsink(
true)
444 template<
typename Arg1,
typename Arg2,
typename Arg3,
typename Arg4,
typename Arg5>
445 sink(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5)
446 :m_sink(
new sink_type(arg1,arg2,arg3,arg4,arg5)),
448 m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
449 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
450 m_nflushs(0),m_ownsink(
true)
453 template<
typename Arg1,
typename Arg2,
typename Arg3,
typename Arg4,
typename Arg5,
typename Arg6>
454 sink(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5,Arg6 arg6)
455 :m_sink(
new sink_type(arg1,arg2,arg3,arg4,arg5,arg6)),
457 m_slotdata(boost::bind(pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
458 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
459 m_nflushs(0),m_ownsink(
true)
464 :m_sink(new sink_type(*t.m_sink)),
466 m_slotdata(boost::bind(
pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
467 m_slotflush(boost::bind(&
this_type::slotfun_flush,this)),
468 m_nflushs(t.m_nflushs),m_ownsink(true)
474 sink(sink_type *
const s)
477 m_slotdata(boost::bind(
pipeline_core_access::append<sink_type,append_type>,boost::ref(*m_sink),_1)),
478 m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
479 m_nflushs(0),m_ownsink(false)
488 sink_type* operator->()
493 sink_type& operator*()
498 const sink_type* operator->()
const
503 const sink_type& operator*()
const
508 slot<const append_type&>& slotdata()
513 slot<void>& slotflush()
519 signal<void>& sigfreeze()
525 signal<void>& sigthaw()
535 template<
typename Chnaf,
536 typename S=
typename source_traits<Chnaf>::current_type,
537 typename T=
typename sink_traits<Chnaf>::append_type>
542 typedef Chnaf pipe_type;
543 typedef S current_type;
544 typedef T append_type;
569 void slotfun_freeze()
594 if(!m_chnaf.has_data())
600 while(m_chnaf.has_data())
602 m_sigdata.
emit(m_chnaf.current());
613 if(m_nflushs==m_slotflush.
nsignals())
620 void slotfun_append(
const append_type& f)
629 if(m_nflushs==m_slotflush.
nsignals())
631 pipeline_core_access::flush(m_chnaf);
639 :m_slotdata(boost::bind(&this_type::slotfun_append,
this,_1)),
640 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
641 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,
this)),
642 m_slotthaw(boost::bind(&this_type::slotfun_thaw,
this)),
643 m_nflushs(0),m_freeze(
false),m_frozen(
false)
646 template<
typename Arg1>
649 m_slotdata(boost::bind(&this_type::slotfun_append,
this,_1)),
650 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
651 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,
this)),
652 m_slotthaw(boost::bind(&this_type::slotfun_thaw,
this)),
653 m_nflushs(0),m_freeze(
false),m_frozen(
false)
656 template<
typename Arg1,
typename Arg2>
657 pipe(Arg1 arg1,Arg2 arg2)
659 m_slotdata(boost::bind(&this_type::slotfun_append,
this,_1)),
660 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
661 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,
this)),
662 m_slotthaw(boost::bind(&this_type::slotfun_thaw,
this)),
663 m_nflushs(0),m_freeze(
false),m_frozen(
false)
666 template<
typename Arg1,
typename Arg2,
typename Arg3>
667 pipe(Arg1 arg1,Arg2 arg2,Arg3 arg3)
668 :m_chnaf(arg1,arg2,arg3),
669 m_slotdata(boost::bind(&this_type::slotfun_append,
this,_1)),
670 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
671 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,
this)),
672 m_slotthaw(boost::bind(&this_type::slotfun_thaw,
this)),
673 m_nflushs(0),m_freeze(
false),m_frozen(
false)
676 template<
typename Arg1,
typename Arg2,
typename Arg3,
typename Arg4>
677 pipe(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4)
678 :m_chnaf(arg1,arg2,arg3,arg4),
679 m_slotdata(boost::bind(&this_type::slotfun_append,
this,_1)),
680 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
681 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,
this)),
682 m_slotthaw(boost::bind(&this_type::slotfun_thaw,
this)),
683 m_nflushs(0),m_freeze(
false),m_frozen(
false)
686 template<
typename Arg1,
typename Arg2,
typename Arg3,
typename Arg4,
typename Arg5>
687 pipe(Arg1 arg1,Arg2 arg2,Arg3 arg3,Arg4 arg4,Arg5 arg5)
688 :m_chnaf(arg1,arg2,arg3,arg4,arg5),
689 m_slotdata(boost::bind(&this_type::slotfun_append,
this,_1)),
690 m_slotflush(boost::bind(&this_type::slotfun_flush,
this)),
691 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,
this)),
692 m_slotthaw(boost::bind(&this_type::slotfun_thaw,
this)),
693 m_nflushs(0),m_freeze(
false),m_frozen(
false)
699 m_slotdata(boost::bind(&
this_type::slotfun_append,this,_1)),
700 m_slotflush(boost::bind(&
this_type::slotfun_flush,this)),
701 m_slotfreeze(boost::bind(&
this_type::slotfun_freeze,this)),
702 m_slotthaw(boost::bind(&
this_type::slotfun_thaw,this)),
703 m_nflushs(t.m_nflushs),m_freeze(false),m_frozen(false)
711 pipe_type* operator->()
716 pipe_type& operator*()
721 const pipe_type* operator->()
const
726 const pipe_type& operator*()
const
731 signal<const current_type&>& sigdata()
736 signal<void>& sigflush()
741 slot<const append_type&>& slotdata()
746 slot<void>& slotflush()
751 slot<void>& slotfreeze()
756 slot<void>& slotthaw()
761 signal<void>& sigfreeze()
766 signal<void>& sigthaw()
787 template<
typename Derived,
typename InputType,
typename OutputType,
788 bool UNCHECKED_PIPE=
false>
793 typedef InputType append_type;
794 typedef OutputType current_type;
806 throw_exception(std::logic_error(
"pipe_facade::emit: overflow"));
807 if(!(UNCHECKED_PIPE||m_can_emit))
808 throw_exception(std::logic_error(
"pipe_facade: invalid or multiple"
809 " emit() detected - emit() may be"
810 " called only once in each of"
811 " prepare(), append(),"
812 " next(), flush()"));
823 else if(m_recursion_count==0)
829 m_have_emitted=
false;
836 pipeline_core_access::next(*derived());
841 while(m_have_emitted)
855 const InputType *m_pending_append;
858 size_t m_recursion_count;
885 return static_cast<Derived *
>(
this);
888 void slotfun_append(
const InputType &d)
891 throw_exception(std::logic_error(
"pipe_facade::slotfun_append: overflow"));
898 pipeline_core_access::prepare(*derived(),d);
905 pipeline_core_access::append(*derived(),d);
915 throw_exception(std::logic_error(
"pipe_facade::slotfun_flush: overflow"));
918 if(m_numflushs==m_slotflush.
nsignals())
923 pipeline_core_access::flush(*derived());
933 void slotfun_freeze()
955 pipeline_core_access::next(*derived());
961 if(m_pending_append!=0)
963 const InputType &p=*m_pending_append;
967 pipeline_core_access::append(*derived(),p);
987 :m_pending_append(0),m_recursion_count(0),m_have_emitted(false),
989 m_slotdata(boost::bind(&this_type::slotfun_append,this,_1)),
990 m_slotflush(boost::bind(&this_type::slotfun_flush,this)),
991 m_slotfreeze(boost::bind(&this_type::slotfun_freeze,this)),
992 m_slotthaw(boost::bind(&this_type::slotfun_thaw,this)),
993 m_freeze(false),m_frozen(false),m_numflushs(0),m_flushed(false)
998 :m_pending_append(0),m_recursion_count(0),m_have_emitted(false),
1000 m_slotdata(boost::bind(&
this_type::slotfun_append,this,_1)),
1001 m_slotflush(boost::bind(&
this_type::slotfun_flush,this)),
1002 m_slotfreeze(boost::bind(&
this_type::slotfun_freeze,this)),
1003 m_slotthaw(boost::bind(&
this_type::slotfun_thaw,this)),
1004 m_freeze(false),m_frozen(false),m_numflushs(t.m_numflushs),m_flushed(t.m_flushed)
1035 signal<void> &sigflush()
1040 slot<const InputType &> &slotdata()
1045 slot<void> &slotflush()
1050 slot<void> &slotfreeze()
1052 return m_slotfreeze;
1055 slot<void> &slotthaw()
1060 signal<void> &sigfreeze()
1065 signal<void> &sigthaw()
1073 #endif // SHORE_PROCESSING_PIPELINE_HPP__