我正在嘗試撰寫異步寫入和讀取資料。我有一些導致寫入緩沖區的功能:
/*
struct DDSEntity
{
std::string key;
std::string participant_key;
std::string topic_name;
std::string topic_type;
bool keyless;
dds_qos_t qos;
std::map<std::string, RouteStatus> routes;
};
struct DiscoveryEvent
{
enum DiscoveryEventType
{
DiscoveredPublication,
UndiscoveredPublication,
DiscoveredSubscription,
UndiscoveredSubscription
};
std::shared_ptr<DDSEntity> entity;
DiscoveryEventType event_type;
};
*/
boost::mutex guard; //global mutex
void send_discovery_event(const dds_entity_t dp, boost::asio::local::stream_protocol::socket *socket,
const DiscoveryEvent& event)
{
SPDLOG_DEBUG("Send discovery event");
boost::async([socket, event]() {
boost::mutex::scoped_lock scoped_lock(guard);
auto bufs = boost::asio::buffer(&event, sizeof(rodds::dds_discovery::DiscoveryEvent));
auto size = boost::asio::write(*socket, bufs);
});
}
另一方面,我正在嘗試像這樣開始讀取發送的資料
class Plugin
{
public:
Plugin(
const dds_entity_t &dp, boost::asio::local::stream_protocol::socket &rx)
: _reader(&rx), _dp(dp), _buffer(&_de, sizeof(rodds::dds_discovery::DiscoveryEvent))
{
SPDLOG_INFO("Plugin initialized");
_reader->async_read_some(
_buffer,
boost::bind(
&Plugin::async_read_handler,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
));
}
void async_read_handler(const boost::system::error_code &error, std::size_t bytes_trans)
{
assert(!error);
assert(bytes_trans == sizeof(rodds::dds_discovery::DiscoveryEvent));
if (_de.event_type == rodds::dds_discovery::DiscoveryEvent::DiscoveredPublication ||
_de.event_type == rodds::dds_discovery::DiscoveryEvent::DiscoveredSubscription)
SPDLOG_INFO("Catch discovery event:{0}, {1}, {2}", _de.event_type, _de.entity->topic_name, _de.entity->topic_type);
else
SPDLOG_INFO("Catch discovery event:{0}", _de.event_type);
_reader->async_read_some(
_buffer,
boost::bind(
&Plugin::async_read_handler,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
));
}
private:
boost::asio::local::stream_protocol::socket* _reader;
boost::asio::mutable_buffer _buffer;
rodds::dds_discovery::DiscoveryEvent _de;
dds_entity_t _dp;
};
int main(int argc, char* argv[])
{
spdlog::set_level(spdlog::level::debug);
// programm can create reader for only one dds_topic
if (argc == 1 || argc > 2)
{
SPDLOG_ERROR("Provide topic name for forwading reader process");
return 0;
}
SPDLOG_INFO("Provided topic to read: {0}", argv[1]);
// create domain_participant, reader and writer
// sockets to catch rodds::dds_discovery::DiscoveryEvent`s
SPDLOG_INFO("Generate DDS domain participant");
const dds_entity_t dp = dds_create_participant(0, NULL, NULL);
boost::asio::io_service io_service;
SPDLOG_INFO("Create reader/writer sockets");
boost::asio::local::stream_protocol::socket tx(io_service), rx(io_service);
boost::asio::local::connect_pair(tx, rx);
boost::asio::io_service::work work(io_service);
// create Plugin instance
Plugin plugin(dp, rx);
rodds::dds_discovery::run_discovery(dp, &tx);
io_service.run();
return 0;
}
但事實證明,我顯然讀取了已經覆寫的資料,因為我得到了輸出
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rt/rosout,|msg::dds_::Log_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0,|rametersReply,|srv::dds_::GetParameters_Response_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rameter_typesReply,srv::dds_::GetParameterTypes_Response_
[2022-09-11 13:23:19.226] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, CrametersReply,srv::dds_::SetParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0,|rameters_atomicallyReply,|srv::dds_::SetParametersAtomically_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, |be_parametersReply,|srv::dds_::DescribeParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, |arametersReply,|srv::dds_::ListParameters_Response_
[2022-09-11 13:23:19.227] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, |nts,|msg::dds_::ParameterEvent_
[2022-09-11 13:23:19.228] [info] [forwading_dds_reader.cpp:61] Catch discovery event:0, rt/chatter,|ds_::String_
如果他們指出我的錯誤,我將不勝感激。謝謝。
uj5u.com熱心網友回復:
您似乎錯過了有線格式的關鍵概念。你正在通過例如
auto bufs = boost::asio::buffer(&event, sizeof(rodds::dds_discovery::DiscoveryEvent));
將其視為event
POD 普通型別。它不是,所以這是未定義的行為!它聚合了許多非 POD 型別(std::string、std::shared_ptr、std::map 等)。為了將這些放在網路上,您必須決定序列化格式。
我最近回答了一個問題Send an array of ints with boost::asio問題給出了一個有效的選項串列。答案包含一些您想閱讀的好建議。我自己當時的回答集中在使用帶有 Asio 的 POD 緩沖區,您也可以將其應用于您的情況。
但是,在您的情況下,它“聞起來”就像您可能會在序列化方法方面得到更多幫助,因為它不需要您對物件的記憶體布局和實作它的線格式進行太多/認真的思考。
提升序列化
您可能會爭取像這樣的實作
void send_discovery_event(const dds_entity_t /*dp*/, boost::asio::local::stream_protocol::socket *socket,
const DiscoveryEvent& event)
{
SPDLOG_DEBUG("Send discovery event");
boost::async([socket, event]() {
boost::mutex::scoped_lock scoped_lock(guard);
/*auto size =*/boost::asio::write(
*socket, boost::asio::buffer(ToWireFormat(event)));
});
}
WhereToWireFormat
使用可移植的文本表示:
template <typename Packet>
std::string ToWireFormat(Packet const& packet) {
std::ostringstream oss;
{
boost::archive::text_oarchive oa(oss);
oa << packet;
} // flush and complete archive
std::string data = std::move(oss).str();
return std::to_string(data.length()) " " data;
}
通過添加成員函式,這可以使用 Boost 序列化serialize
:
struct DDSEntity
{
std::string key;
std::string participant_key;
std::string topic_name;
std::string topic_type;
bool keyless;
dds_qos_t qos;
std::map<std::string, RouteStatus> routes;
// clang-format off
template <typename Ar> void serialize(Ar& ar, unsigned) {
ar & key & participant_key
& topic_name & topic_type & keyless
& qos & routes;
}
// clang-format on
};
using dds_entity_t = DDSEntity;
struct DiscoveryEvent
{
enum DiscoveryEventType
{
DiscoveredPublication,
UndiscoveredPublication,
DiscoveredSubscription,
UndiscoveredSubscription
};
std::shared_ptr<DDSEntity> entity;
DiscoveryEventType event_type;
template <typename Ar> void serialize(Ar& ar, unsigned) {
ar & entity & event_type;
}
};
當然,您沒有顯示的組成型別也應該是可序列化的。讓我們用自由函式serialize
來證明這一點:
struct dds_qos_t {}; // TODO IMPLEMENT
struct RouteStatus {}; // TODO IMPLEMENT
template <typename Ar>
static inline void serialize(Ar&, dds_qos_t&, unsigned) { /* TODO implement */ }
template <typename Ar>
static inline void serialize(Ar&, RouteStatus&, unsigned) { /* TODO implement */ }
現在,重要的部分是std::string
,shared_ptr
并且map
都是 Boost 內置的:
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/shared_ptr.hpp>
夠了!例如,只做
dds_entity_t dds_create_participant(int, void*, void*) {
return dds_entity_t{
"key",
"participant_key",
"topic_name",
"topic_type",
true,
dds_qos_t{},
{
{"route1", RouteStatus{}},
{"route2", RouteStatus{}},
{"route3", RouteStatus{}},
},
};
}
const dds_entity_t dp = dds_create_participant(0, NULL, NULL);
std::cout << ToWireFormat(dp) << std::endl;
列印例如
137 22 serialization::archive 19 1 0
0 3 key 15 participant_key 10 topic_name 10 topic_type 1 0 0 0 0 3 0 0 0 6 route1 0 0 6 route2 6 route3
進一步的問題
我不確定互斥鎖的使用。如果它同步對事件資料的訪問,您可以縮小范圍:
boost::mutex::scoped_lock scoped_lock(guard); auto payload = ToWireFormat(event); scoped_lock.unlock(); /*auto size =*/boost::asio::write(*socket, boost::asio::buffer(payload));
如果你想在套接字上同步訪問,更喜歡使用 strand:Strands: Use Threads without Explicit Locking
閱讀時,您使用
read_some
which 沒有邏輯來確保閱讀完成。相反,使用自由函陣列合操作(asio::async_read、asio::async_read_until)來讀取所需的資訊:void do_receive_message() { async_read_until(_reader, _buffer, " ", boost::bind(&Plugin::on_read_length, this, ph::error, ph::bytes_transferred)); } void on_read_length(error_code ec, size_t xfr) { logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl; std::size_t length; char space; if (!ec.failed() && std::istream(&_buffer) >> std::noskipws >> length >> space && space == ' ') // { if (length <= _buffer.size()) on_read_message(ec, 0); else { logger << "Reading " << length << " more to complete message" << std::endl; async_read(_reader, _buffer, asio::transfer_exactly(length - _buffer.size()), boost::bind(&Plugin::on_read_message, this, ph::error, ph::bytes_transferred)); } } } void on_read_message(error_code ec, size_t xfr) { logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl; std::istream msg(&_buffer); auto de = FromWireFormat<DiscoveryEvent>(msg); if (de.event_type == DiscoveryEvent::DiscoveredPublication || de.event_type == DiscoveryEvent::DiscoveredSubscription) logger << "INFO\t" << "Catch discovery event:" << de.event_type << ", " << de.entity->topic_name << ", " << de.entity->topic_type << std::endl; else logger << "INFO\t" << "Catch discovery event:" << de.event_type << std::endl; if (!ec) do_receive_message(); }
這假設我們將緩沖區替換為
asio::streambuf
:asio::streambuf _buffer;
并且
FromWireFormat
再次是 Boost 序列化函式的簡單包裝器:template <typename Packet> Packet FromWireFormat(std::istream& is) { // length has already been taken off by the read operations Packet packet; boost::archive::text_iarchive ia(is); ia >> packet; is.ignore(2, '\n'); // eat newline from archive return packet; }
完整演示
還對一些東西進行了現代化改造并擺脫了boost::async
和互斥體以支持鏈,因此我們在套接字上沒有競爭條件:
住在科利魯
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/shared_ptr.hpp>
using boost::system::error_code;
namespace asio = boost::asio;
namespace ph = asio::placeholders;
using protocol = asio::local::stream_protocol;
using namespace std::chrono_literals;
static std::ostream logger(std::cout.rdbuf());
namespace rodds { namespace dds_discovery {
struct dds_qos_t {}; // TODO IMPLEMENT
struct RouteStatus {}; // TODO IMPLEMENT
template <typename Ar>
static inline void serialize(Ar&, dds_qos_t&, unsigned) { /* TODO implement */ }
template <typename Ar>
static inline void serialize(Ar&, RouteStatus&, unsigned) { /* TODO implement */ }
struct DDSEntity
{
std::string key;
std::string participant_key;
std::string topic_name;
std::string topic_type;
bool keyless;
dds_qos_t qos;
std::map<std::string, RouteStatus> routes;
// clang-format off
template <typename Ar> void serialize(Ar& ar, unsigned) {
ar & key & participant_key
& topic_name & topic_type & keyless
& qos & routes;
}
// clang-format on
};
using dds_entity_t = DDSEntity;
struct DiscoveryEvent
{
enum DiscoveryEventType
{
DiscoveredPublication,
UndiscoveredPublication,
DiscoveredSubscription,
UndiscoveredSubscription
};
std::shared_ptr<DDSEntity> entity;
DiscoveryEventType event_type;
template <typename Ar> void serialize(Ar& ar, unsigned) {
ar & entity & event_type;
}
};
template <typename Packet>
std::string ToWireFormat(Packet const& packet) {
std::ostringstream oss;
{
boost::archive::text_oarchive oa(oss);
oa << packet;
} // flush and complete archive
std::string data = std::move(oss).str();
return std::to_string(data.length()) " " data;
}
template <typename Packet>
Packet FromWireFormat(std::istream& is) {
// length has already been taken off by the read operations
Packet packet;
boost::archive::text_iarchive ia(is);
ia >> packet;
is.ignore(2, '\n'); // eat newline from archive
return packet;
}
}} // namespace rodds::dds_discovery
using namespace rodds::dds_discovery;
void send_discovery_event(const dds_entity_t /*dp*/, protocol::socket& socket,
const DiscoveryEvent& event) {
logger << "DEBUG\tSend discovery event" << std::endl;
asio::post(socket.get_executor(), [&socket, event]() {
auto size = asio::write(socket, asio::buffer(ToWireFormat(event)));
logger << "DEBUG\tSent discovery event (" << size << ")" << std::endl;
});
}
class Plugin {
public:
Plugin(const dds_entity_t& dp, protocol::socket& rx)
: _reader(rx)
, _dp(dp) {
logger << "INFO\tPlugin initialized" << std::endl;
do_receive_message();
}
void do_receive_message() {
async_read_until(_reader, _buffer, " ",
boost::bind(&Plugin::on_read_length, this, ph::error,
ph::bytes_transferred));
}
void on_read_length(error_code ec, size_t xfr) {
logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl;
std::size_t length;
char space;
if (!ec.failed() &&
std::istream(&_buffer) >> std::noskipws >> length >> space &&
space == ' ') //
{
if (length <= _buffer.size())
on_read_message(ec, 0);
else {
logger << "Reading " << length << " more to complete message"
<< std::endl;
async_read(_reader, _buffer,
asio::transfer_exactly(length - _buffer.size()),
boost::bind(&Plugin::on_read_message, this,
ph::error, ph::bytes_transferred));
}
}
}
void on_read_message(error_code ec, size_t xfr) {
logger << "DEBUG\t" << __FUNCTION__ << "\t" << ec.message() << " " << xfr << std::endl;
std::istream msg(&_buffer);
auto de = FromWireFormat<DiscoveryEvent>(msg);
if (de.event_type == DiscoveryEvent::DiscoveredPublication ||
de.event_type == DiscoveryEvent::DiscoveredSubscription)
logger << "INFO\t"
<< "Catch discovery event:" << de.event_type << ", "
<< de.entity->topic_name << ", " << de.entity->topic_type
<< std::endl;
else
logger << "INFO\t"
<< "Catch discovery event:" << de.event_type << std::endl;
if (!ec)
do_receive_message();
}
private:
protocol::socket& _reader;
asio::streambuf _buffer;
dds_entity_t _dp;
};
static dds_entity_t dds_create_participant(int, void*, void*) {
return dds_entity_t{
"key",
"participant_key",
"topic_name",
"topic_type",
true,
dds_qos_t{},
{
{"route1", RouteStatus{}},
{"route2", RouteStatus{}},
{"route3", RouteStatus{}},
},
};
}
int main(int argc, char* argv[])
{
// program can create reader for only one dds_topic
if (argc != 2) {
logger << "ERROR\tProvide topic name for forwading reader process" << std::endl;
return 1;
}
logger << "INFO\tProvided topic to read: " << argv[1] << std::endl;
// create domain_participant, reader and writer
// sockets to catch rodds::dds_discovery::DiscoveryEvent`s
logger << "INFO\tGenerate DDS domain participant" << std::endl;
asio::io_context ioc;
logger << "INFO\tCreate reader/writer sockets" << std::endl;
protocol::socket tx(make_strand(ioc)), rx(make_strand(ioc));
connect_pair(tx, rx);
// create Plugin instance
const auto dp = std::make_shared<dds_entity_t>( //
dds_create_participant(0, NULL, NULL));
Plugin plugin(*dp, rx);
DiscoveryEvent de { dp, DiscoveryEvent::DiscoveredPublication };
//logger << ToWireFormat(de) << std::endl;
send_discovery_event(*dp, tx, de);
de.event_type = DiscoveryEvent::DiscoveredSubscription;
send_discovery_event(*dp, tx, de);
auto work = make_work_guard(ioc);
//rodds::dds_discovery::run_discovery(dp, &tx);
ioc.run_for(1s);
tx.close();
work.reset();
ioc.run();
}
印刷:
INFO Provided topic to read: tn
INFO Generate DDS domain participant
INFO Create reader/writer sockets
INFO Plugin initialized
DEBUG Send discovery event
DEBUG Send discovery event
DEBUG Sent discovery event (153)
DEBUG Sent discovery event (153)
DEBUG on_read_length Success 4
DEBUG on_read_message Success 0
INFO Catch discovery event:0, topic_name, topic_type
DEBUG on_read_length Success 4
DEBUG on_read_message Success 0
INFO Catch discovery event:2, topic_name, topic_type
DEBUG on_read_length End of file 0
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/508482.html
上一篇:異步存盤核心資料會破壞關系
下一篇:反應鉤子中的異步等待承諾拒絕