Skip to content

Commit bc9c0b3

Browse files
committed
DPL: improve reliability of --driver-client-backend ws://
- Flush messages after handshake - Fix handling of splitted frames - Move to `ControlServiceHelpers::processCommand()`
1 parent f14182b commit bc9c0b3

File tree

10 files changed

+72
-45
lines changed

10 files changed

+72
-45
lines changed

Framework/Core/src/ControlService.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ void ControlService::readyToQuit(QuitRequest what)
5555
mDriverClient.tell("CONTROL_ACTION: READY_TO_QUIT_ME");
5656
break;
5757
}
58+
mDriverClient.flushPending();
5859
}
5960

6061
void ControlService::notifyStreamingState(StreamingState state)
@@ -73,6 +74,7 @@ void ControlService::notifyStreamingState(StreamingState state)
7374
default:
7475
throw std::runtime_error("Unknown streaming state");
7576
}
77+
mDriverClient.flushPending();
7678
}
7779

7880
} // namespace o2::framework

Framework/Core/src/ControlServiceHelpers.cxx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
3535
std::string const& command,
3636
std::string const& arg)
3737
{
38-
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, int pid, auto lambda) {
38+
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, pid_t pid, auto lambda) {
3939
for (auto& deviceInfo : infos) {
4040
if (deviceInfo.pid == pid) {
41-
lambda(deviceInfo);
42-
break;
41+
return lambda(deviceInfo);
4342
}
4443
}
44+
LOGP(error, "Command received for pid {} which does not exists.", pid);
4545
};
4646
LOGP(debug2, "Found control command {} from pid {} with argument {}.", command, pid, arg);
4747
if (command == "QUIT" && arg == "ALL") {
@@ -59,6 +59,8 @@ void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
5959
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") {
6060
// FIXME: this should really be a policy...
6161
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
62+
} else {
63+
LOGP(error, "Unknown command {} with argument {}", command, arg);
6264
}
6365
};
6466

Framework/Core/src/DPLWebSocket.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,11 @@ void websocket_client_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_
191191
}
192192

193193
// FIXME: mNonce should be random
194-
WSDPLClient::WSDPLClient(uv_stream_t* s, DeviceSpec const& spec)
194+
WSDPLClient::WSDPLClient(uv_stream_t* s, DeviceSpec const& spec, std::function<void()> handshake)
195195
: mStream{s},
196196
mNonce{"dGhlIHNhbXBsZSBub25jZQ=="},
197-
mSpec{spec}
197+
mSpec{spec},
198+
mHandshake{handshake}
198199
{
199200
s->data = this;
200201
uv_read_start((uv_stream_t*)s, (uv_alloc_cb)my_alloc_cb, websocket_client_callback);
@@ -257,6 +258,7 @@ void WSDPLClient::endHeaders()
257258
LOG(INFO) << "Correctly handshaken websocket connection.";
258259
/// Create an appropriate reply
259260
mHandshaken = true;
261+
mHandshake();
260262
}
261263

262264
void ws_client_write_callback(uv_write_t* h, int status)

Framework/Core/src/DPLWebSocket.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <memory>
1616
#include <string>
1717
#include <map>
18+
#include <functional>
1819

1920
class uv_stream_s;
2021

@@ -53,7 +54,9 @@ struct WSDPLHandler : public HTTPParser {
5354
struct WSDPLClient : public HTTPParser {
5455
/// @a stream where the communication happens and @a spec of the device connecting
5556
/// to the driver.
56-
WSDPLClient(uv_stream_t* stream, DeviceSpec const& spec);
57+
/// @a spec the DeviceSpec associated with this client
58+
/// @a handshake a callback to invoke whenever we have a successful handshake
59+
WSDPLClient(uv_stream_t* stream, DeviceSpec const& spec, std::function<void()> handshake);
5760
void replyVersion(std::string_view const& s) override;
5861
void replyCode(std::string_view const& s) override;
5962
void header(std::string_view const& k, std::string_view const& v) override;
@@ -72,6 +75,7 @@ struct WSDPLClient : public HTTPParser {
7275
std::string mNonce;
7376
DeviceSpec const& mSpec;
7477
bool mHandshaken = false;
78+
std::function<void()> mHandshake;
7579
uv_stream_t* mStream = nullptr;
7680
std::map<std::string, std::string> mHeaders;
7781
};

Framework/Core/src/HTTPParser.cxx

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,19 @@ void encode_websocket_frames(std::vector<uv_buf_t>& outputs, char const* src, si
8888

8989
void decode_websocket(char* start, size_t size, WebSocketHandler& handler)
9090
{
91-
char* cur = start;
91+
// Handle the case the previous message was cut in half
92+
// by the I/O stack.
93+
char* cur = start + handler.remainingSize;
94+
if (handler.remainingSize) {
95+
assert(handler.pendingBuffer);
96+
memcpy(handler.pendingBuffer + handler.pendingSize, start, handler.remainingSize);
97+
size_t pendingProcessingSize = handler.pendingSize + handler.remainingSize;
98+
handler.remainingSize = 0;
99+
// One recursion should be enough.
100+
decode_websocket(handler.pendingBuffer, pendingProcessingSize, handler);
101+
delete[] handler.pendingBuffer;
102+
handler.pendingBuffer = nullptr;
103+
}
92104
handler.beginChunk();
93105
while (cur - start < size) {
94106
WebSocketFrameTiny* header = (WebSocketFrameTiny*)cur;
@@ -106,9 +118,19 @@ void decode_websocket(char* start, size_t size, WebSocketHandler& handler)
106118
payloadSize = headerSmall->len64;
107119
headerSize = 2 + 8 + (header->mask ? 4 : 0);
108120
}
121+
size_t availableSize = size - (cur - start);
122+
/// FIXME: handle the case in which the header itself is cut
123+
/// apart.
124+
if (availableSize < payloadSize + headerSize) {
125+
handler.remainingSize = payloadSize + headerSize - availableSize;
126+
handler.pendingSize = availableSize;
127+
handler.pendingBuffer = new char[payloadSize + headerSize];
128+
memcpy(handler.pendingBuffer, cur, availableSize);
129+
break;
130+
}
109131
if (header->mask) {
110132
int32_t mask = *(int32_t*)(cur + headerSize - 4);
111-
memunmask(cur, payloadSize, mask);
133+
memunmask(cur + headerSize, payloadSize, mask);
112134
}
113135
handler.frame(cur + headerSize, payloadSize);
114136
cur += headerSize + payloadSize;

Framework/Core/src/HTTPParser.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ struct WebSocketHandler {
102102
virtual void endFragmentation() {}
103103
/// FIXME: not implemented
104104
virtual void control(char const* frame, size_t s) {}
105+
106+
/// Bytes which are still to be received for the previous, half delivered frame.
107+
size_t remainingSize = 0;
108+
/// Bytes which are already there from the previous, half delivered frame.
109+
size_t pendingSize = 0;
110+
/// A buffer large enough to contain the next frame to be processed.
111+
char* pendingBuffer = nullptr;
105112
};
106113

107114
/// Decoder for websocket data. For now we assume that the frame was not split. However multiple

Framework/Core/src/WSDriverClient.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ void on_connect(uv_connect_t* connection, int status)
2525
return;
2626
}
2727
WSDriverClient* client = (WSDriverClient*)connection->data;
28-
client->setDPLClient(std::make_unique<WSDPLClient>(connection->handle, client->spec()));
28+
auto onHandshake = [client]() {
29+
client->flushPending();
30+
};
31+
client->setDPLClient(std::make_unique<WSDPLClient>(connection->handle, client->spec(), onHandshake));
2932
client->sendHandshake();
3033
}
3134

Framework/Core/src/WSDriverClient.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "Framework/DriverClient.h"
1414
#include <uv.h>
15+
#include <functional>
1516
#include <memory>
1617
#include <string>
1718
#include <vector>

Framework/Core/src/runDataProcessing.cxx

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -449,38 +449,6 @@ void updateMetricsNames(DriverInfo& driverInfo, std::vector<DeviceMetricsInfo> c
449449
driverInfo.availableMetrics.swap(result);
450450
}
451451

452-
void processCommand(DeviceInfos& infos,
453-
pid_t pid,
454-
std::string const& command,
455-
std::string const& arg)
456-
{
457-
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, int pid, auto lambda) {
458-
for (auto& deviceInfo : infos) {
459-
if (deviceInfo.pid == pid) {
460-
lambda(deviceInfo);
461-
break;
462-
}
463-
}
464-
};
465-
LOGP(info, "Found control command {} from pid {} with argument {}.", command, pid, arg);
466-
if (command == "QUIT" && arg == "ALL") {
467-
for (auto& deviceInfo : infos) {
468-
deviceInfo.readyToQuit = true;
469-
}
470-
} else if (command == "QUIT" && arg == "ME") {
471-
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; });
472-
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "IDLE") {
473-
// FIXME: this should really be a policy...
474-
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; info.streamingState = StreamingState::Idle; });
475-
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "STREAMING") {
476-
// FIXME: this should really be a policy...
477-
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::Streaming; });
478-
} else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") {
479-
// FIXME: this should really be a policy...
480-
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
481-
}
482-
};
483-
484452
/// An handler for a websocket message stream.
485453
struct ControlWebSocketHandler : public WebSocketHandler {
486454
ControlWebSocketHandler(DriverServerContext& context)
@@ -528,6 +496,7 @@ struct ControlWebSocketHandler : public WebSocketHandler {
528496
ParsedConfigMatch configMatch;
529497
ParsedMetricMatch metricMatch;
530498

499+
LOG(debug3) << "Data received: " << std::string_view(frame, s);
531500
if (DeviceMetricsHelper::parseMetric(token, metricMatch)) {
532501
// We use this callback to cache which metrics are needed to provide a
533502
// the DataRelayer view.
@@ -536,15 +505,16 @@ struct ControlWebSocketHandler : public WebSocketHandler {
536505
didProcessMetric = true;
537506
didHaveNewMetric |= hasNewMetric;
538507
} else if (ControlServiceHelpers::parseControl(token, match)) {
539-
LOG(debug2) << "Found a command, processing for pid " << mPid;
508+
LOG(error) << "Found a command, processing for pid " << mPid;
540509
assert(mContext.infos);
541-
processCommand(*mContext.infos, mPid, match[1].str(), match[2].str());
510+
ControlServiceHelpers::processCommand(*mContext.infos, mPid, match[1].str(), match[2].str());
542511
} else if (DeviceConfigHelper::parseConfig(std::string{" "} + token, configMatch)) {
543512
LOG(debug2) << "Found configuration information for pid " << mPid;
544513
assert(mContext.infos);
545514
DeviceConfigHelper::processConfig(configMatch, (*mContext.infos)[mIndex]);
515+
} else {
516+
LOG(error) << "Unexpected control data: " << std::string_view(frame, s);
546517
}
547-
LOG(debug3) << "Data received: " << std::string_view(frame, s);
548518
}
549519

550520
/// FIXME: not implemented

Framework/Core/test/test_HTTPParser.cxx

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class TestWSHandler : public WebSocketHandler
8888
std::vector<size_t> mSize;
8989
void frame(const char* f, size_t s) final
9090
{
91-
mFrame.push_back(f);
91+
mFrame.push_back(strdup(f));
9292
mSize.push_back(s);
9393
}
9494
};
@@ -212,6 +212,20 @@ BOOST_AUTO_TEST_CASE(HTTPParser1)
212212
BOOST_REQUIRE_EQUAL(std::string(handler.mFrame[1], handler.mSize[1] - 1), std::string(buffer2));
213213
}
214214
{
215+
// Decode a frame which is split in two.
216+
char* buffer = strdup("hello websockets!1");
217+
std::vector<uv_buf_t> encoded;
218+
encode_websocket_frames(encoded, buffer, strlen(buffer) + 1, WebSocketOpCode::Binary, 0);
219+
BOOST_REQUIRE_EQUAL(encoded.size(), 1);
220+
221+
TestWSHandler handler;
222+
decode_websocket(encoded[0].base, encoded[0].len / 2, handler);
223+
decode_websocket(encoded[0].base + encoded[0].len / 2, encoded[0].len - encoded[0].len / 2, handler);
224+
BOOST_REQUIRE_EQUAL(handler.mFrame.size(), 1);
225+
BOOST_REQUIRE_EQUAL(handler.mSize.size(), 1);
226+
BOOST_REQUIRE_EQUAL(std::string(handler.mFrame[0], handler.mSize[0] - 1), std::string(buffer));
227+
}
228+
{} {
215229
std::string checkRequest =
216230
"GET /chat HTTP/1.1\r\n"
217231
"Upgrade: websocket\r\n"

0 commit comments

Comments
 (0)