1919#pragma once
2020
2121#include < cstdint>
22- #include < iterator>
2322#include < memory>
2423#include < type_traits>
2524#include < unordered_map>
2827
2928#include " caf/actor.hpp"
3029#include " caf/actor_addr.hpp"
31- #include " caf/actor_clock.hpp"
3230#include " caf/actor_system.hpp"
3331#include " caf/actor_system_config.hpp"
3432#include " caf/binary_deserializer.hpp"
4038#include " caf/detail/worker_hub.hpp"
4139#include " caf/error.hpp"
4240#include " caf/fwd.hpp"
43- #include " caf/intrusive/drr_queue.hpp"
44- #include " caf/intrusive/fifo_inbox.hpp"
45- #include " caf/intrusive/singly_linked.hpp"
4641#include " caf/mailbox_element.hpp"
4742#include " caf/net/actor_proxy_impl.hpp"
4843#include " caf/net/basp/constants.hpp"
5348#include " caf/net/basp/worker.hpp"
5449#include " caf/net/consumer.hpp"
5550#include " caf/net/consumer_queue.hpp"
56- #include " caf/net/multiplexer.hpp"
5751#include " caf/net/receive_policy.hpp"
5852#include " caf/net/socket_manager.hpp"
5953#include " caf/node_id.hpp"
60- #include " caf/policy/normal_messages.hpp"
6154#include " caf/proxy_registry.hpp"
6255#include " caf/response_promise.hpp"
6356#include " caf/scoped_execution_unit.hpp"
6457#include " caf/send.hpp"
6558#include " caf/tag/message_oriented.hpp"
6659#include " caf/unit.hpp"
67- #include " caf/variant.hpp"
6860
6961namespace caf ::net::basp {
7062
@@ -96,34 +88,35 @@ class CAF_NET_EXPORT application : public consumer {
9688 error init (socket_manager* owner, LowerLayerPtr down, const settings& cfg) {
9789 // Initialize member variables.
9890 owner_ = owner;
99- system_ = &owner->mpx (). system ();
91+ system_ = &owner->system ();
10092 executor_.system_ptr (system_);
10193 executor_.proxy_registry_ptr (&proxies_);
94+ max_throughput_ = get_or (cfg, " caf.scheduler.max-throughput" ,
95+ defaults::scheduler::max_throughput);
10296 auto workers = get_or<size_t >(
10397 cfg, " caf.middleman.workers" ,
10498 std::min (3u , std::thread::hardware_concurrency () / 4u ) + 1 );
105- max_throughput_ = get_or (system ().config (), " caf.scheduler.max-throughput" ,
106- defaults::scheduler::max_throughput);
10799 for (size_t i = 0 ; i < workers; ++i)
108100 hub_->add_new_worker (*queue_, proxies_);
109101 // Write handshake.
110- return write_message (
111- down, header{message_type::handshake, version}, system (). node (),
112- get_or ( system (). config (), " caf.middleman.app-identifiers " ,
113- application::default_app_ids ()) );
102+ auto app_ids = get_or (cfg, " caf.middleman.app-identifiers " ,
103+ application::default_app_ids ());
104+ return write_message (down, header{message_type::handshake, version} ,
105+ system (). node (), app_ids );
114106 }
115107
116108 template <class LowerLayerPtr >
117109 bool prepare_send (LowerLayerPtr& down) {
110+ CAF_LOG_TRACE (" " );
118111 if (!handshake_complete ())
119112 return true ;
120113 if (auto err = dequeue_events (down)) {
121- CAF_LOG_ERROR (" handle_events failed: " << CAF_ARG (err));
114+ CAF_LOG_ERROR (" dequeue_events failed: " << CAF_ARG (err));
122115 down->abort_reason (err);
123116 return false ;
124117 }
125118 if (auto err = dequeue_messages (down)) {
126- CAF_LOG_ERROR (" handle_messages failed: " << CAF_ARG (err));
119+ CAF_LOG_ERROR (" dequeue_messages failed: " << CAF_ARG (err));
127120 down->abort_reason (err);
128121 return false ;
129122 }
@@ -132,6 +125,7 @@ class CAF_NET_EXPORT application : public consumer {
132125
133126 template <class LowerLayerPtr >
134127 ptrdiff_t consume (LowerLayerPtr& down, byte_span buffer) {
128+ CAF_LOG_TRACE (CAF_ARG2 (" buffer.size" , buffer.size ()));
135129 if (auto err = handle (down, buffer)) {
136130 CAF_LOG_ERROR (" could not handle message: " << CAF_ARG (err));
137131 down->abort_reason (err);
@@ -142,13 +136,15 @@ class CAF_NET_EXPORT application : public consumer {
142136
143137 template <class LowerLayerPtr >
144138 bool done_sending (LowerLayerPtr&) {
139+ CAF_LOG_TRACE (" " );
145140 if (mailbox_.blocked ())
146141 return true ;
147142 return (mailbox_.empty () && mailbox_.try_block ());
148143 }
149144
150145 template <class LowerLayerPtr >
151146 void abort (LowerLayerPtr&, const error&) {
147+ CAF_LOG_TRACE (" " );
152148 // nop
153149 }
154150
@@ -163,15 +159,12 @@ class CAF_NET_EXPORT application : public consumer {
163159 // / Writes a message to the message buffer of `down`.
164160 template <class LowerLayerPtr , class ... Ts>
165161 error write_message (LowerLayerPtr& down, header hdr, Ts&&... xs) {
162+ CAF_LOG_TRACE (CAF_ARG (hdr));
166163 down->begin_message ();
167164 auto & buf = down->message_buffer ();
168165 binary_serializer sink{&executor_, buf};
169- if (!sink.apply_object (hdr))
166+ if (!sink.apply_objects (hdr, xs... ))
170167 return sink.get_error ();
171- if constexpr (sizeof ...(xs) >= 1 ) {
172- if (!sink.apply_objects (xs...))
173- return sink.get_error ();
174- }
175168 down->end_message ();
176169 return none;
177170 }
@@ -212,6 +205,7 @@ class CAF_NET_EXPORT application : public consumer {
212205
213206 template <class LowerLayerPtr >
214207 error dequeue_events (LowerLayerPtr& down) {
208+ CAF_LOG_TRACE (" " );
215209 if (!mailbox_.blocked ()) {
216210 mailbox_.fetch_more ();
217211 auto & q = std::get<0 >(mailbox_.queue ().queues ());
@@ -220,14 +214,14 @@ class CAF_NET_EXPORT application : public consumer {
220214 for (auto ptr = q.next (); ptr != nullptr ; ptr = q.next ()) {
221215 auto f = detail::make_overload (
222216 [&](consumer_queue::event::resolve_request& x) {
223- write_resolve_request (down, x.locator , x.listener );
217+ write_resolve_request (down, x.path , x.listener );
224218 },
225219 [&](consumer_queue::event::new_proxy& x) { new_proxy (down, x.id ); },
226220 [&](consumer_queue::event::local_actor_down& x) {
227221 local_actor_down (down, x.id , std::move (x.reason ));
228222 },
229223 [&](consumer_queue::event::timeout& x) {
230- timeout (down, x.type , x.id );
224+ timeout (down, std::move ( x.type ) , x.id );
231225 });
232226 visit (f, ptr->value );
233227 }
@@ -250,27 +244,31 @@ class CAF_NET_EXPORT application : public consumer {
250244 }
251245
252246 template <class LowerLayerPtr >
253- void new_proxy (LowerLayerPtr& down, actor_id id) {
247+ void new_proxy (LowerLayerPtr& down, actor_id aid) {
248+ CAF_LOG_TRACE (CAF_ARG (aid));
254249 if (auto err = write_message (down, header{message_type::monitor_message,
255- static_cast <uint64_t >(id )}))
250+ static_cast <uint64_t >(aid )}))
256251 down->abort_reason (err);
257252 }
258253
259254 template <class LowerLayerPtr >
260- void local_actor_down (LowerLayerPtr& down, actor_id id, error reason) {
255+ void local_actor_down (LowerLayerPtr& down, actor_id aid, error reason) {
256+ CAF_LOG_TRACE (CAF_ARG (aid) << CAF_ARG (reason));
261257 if (auto err = write_message (
262- down, header{message_type::down_message, static_cast <uint64_t >(id )},
258+ down, header{message_type::down_message, static_cast <uint64_t >(aid )},
263259 reason))
264260 down->abort_reason (err);
265261 }
266262
267263 template <class LowerLayerPtr >
268264 void timeout (LowerLayerPtr& down, std::string type, uint64_t id) {
265+ CAF_LOG_TRACE (CAF_ARG (type) << CAF_ARG (id));
269266 down->timeout (std::move (type), id);
270267 }
271268
272269 template <class LowerLayerPtr >
273270 error dequeue_messages (LowerLayerPtr& down) {
271+ CAF_LOG_TRACE (" " );
274272 for (size_t count = 0 ; count < max_throughput_; ++count) {
275273 auto ptr = next_message ();
276274 if (ptr == nullptr )
@@ -376,6 +374,7 @@ class CAF_NET_EXPORT application : public consumer {
376374
377375 template <class LowerLayerPtr >
378376 error handle_actor_message (LowerLayerPtr&, header hdr, byte_span payload) {
377+ CAF_LOG_TRACE (CAF_ARG (hdr) << CAF_ARG2 (" payload.size" , payload.size ()));
379378 auto worker = hub_->pop ();
380379 if (worker != nullptr ) {
381380 CAF_LOG_DEBUG (" launch BASP worker for deserializing an actor_message" );
0 commit comments