Silicium
buffer.hpp
Go to the documentation of this file.
1 #ifndef SILICIUM_BUFFER_HPP
2 #define SILICIUM_BUFFER_HPP
3 
4 #include <silicium/observable/observer.hpp>
5 #include <silicium/config.hpp>
6 #include <boost/circular_buffer.hpp>
7 #include <cassert>
8 #include <cstddef>
9 
10 namespace Si
11 {
12  template <class Element, class Original>
14  : private observer<Element>
15  {
16  typedef Element element_type;
17 
19  : receiver(nullptr)
20  , fetching(false)
21  {
22  }
23 
24  explicit buffer_observable(Original from, std::size_t size)
25  : from(std::move(from))
26  , elements(size)
27  , receiver(nullptr)
28  , fetching(false)
29  {
30  }
31 
32 #if defined(_MSC_VER) || (BOOST_VERSION <= 105400)
34  : from(std::move(other.from))
35  , elements(std::move(other.elements))
36  , receiver(nullptr)
37  , fetching(false)
38  {
39  }
40 
42  {
43  //TODO: exception safety
44  from = std::move(other.from);
45  elements = std::move(other.elements);
46  receiver = std::move(other.receiver);
47  fetching = std::move(other.fetching);
48  return *this;
49  }
50 #else
53 #endif
54 
55  void async_get_one(ptr_observer<observer<element_type>> receiver)
56  {
57  assert(!this->receiver);
58  this->receiver = receiver.get();
59  if (elements.empty())
60  {
61  return check_fetch();
62  }
63  else
64  {
65  return deliver_front();
66  }
67  }
68 
69  void prefetch()
70  {
71  check_fetch();
72  }
73 
74  private:
75 
76  Original from;
77  boost::circular_buffer<Element> elements;
78  observer<element_type> *receiver;
79  bool fetching;
80 
81  virtual void got_element(element_type value) SILICIUM_OVERRIDE
82  {
83  assert(!elements.full());
84  assert(fetching);
85  fetching = false;
86  if (elements.empty() &&
87  receiver)
88  {
89  exchange(receiver, nullptr)->got_element(std::move(value));
90  return check_fetch();
91  }
92  elements.push_back(std::move(value));
93  if (!receiver)
94  {
95  return check_fetch();
96  }
97  deliver_front();
98  return check_fetch();
99  }
100 
101  virtual void ended() SILICIUM_OVERRIDE
102  {
103  assert(fetching);
104  assert(receiver);
105  exchange(receiver, nullptr)->ended();
106  }
107 
108  void deliver_front()
109  {
110  auto front = std::move(elements.front());
111  elements.pop_front();
112  exchange(receiver, nullptr)->got_element(std::move(front));
113  }
114 
115  void check_fetch()
116  {
117  if (elements.full())
118  {
119  return;
120  }
121  if (fetching)
122  {
123  return;
124  }
125  fetching = true;
126  from.async_get_one(observe_by_ref(static_cast<observer<Element> &>(*this)));
127  }
128 
131  };
132 
133  template <class Original>
134  auto make_buffer_observable(Original &&from, std::size_t size) -> buffer_observable<typename std::decay<Original>::type::element_type, typename std::decay<Original>::type>
135  {
136  typedef typename std::decay<Original>::type clean_original;
137  typedef typename clean_original::element_type element;
138  return buffer_observable<element, clean_original>(std::forward<Original>(from), size);
139  }
140 }
141 
142 #endif
void async_get_one(ptr_observer< observer< element_type >> receiver)
Definition: buffer.hpp:55
std::remove_reference< T >::type && move(T &&ref)
Definition: move.hpp:10
buffer_observable(buffer_observable &&other)
Definition: buffer.hpp:33
Definition: buffer.hpp:13
Definition: absolute_path.hpp:352
auto make_buffer_observable(Original &&from, std::size_t size) -> buffer_observable< typename std::decay< Original >::type::element_type, typename std::decay< Original >::type >
Definition: buffer.hpp:134
Definition: absolute_path.hpp:19
buffer_observable(Original from, std::size_t size)
Definition: buffer.hpp:24
#define SILICIUM_OVERRIDE
Definition: config.hpp:140
buffer_observable()
Definition: buffer.hpp:18
T exchange(T &dest, U &&source)
Definition: exchange.hpp:30
void prefetch()
Definition: buffer.hpp:69
#define SILICIUM_DELETED_FUNCTION(f)
Definition: config.hpp:111
Element element_type
Definition: buffer.hpp:16
buffer_observable & operator=(buffer_observable &&other)
Definition: buffer.hpp:41