Conversation
|
it seems like you haven't added any nanpa changeset files to this PR. if this pull request includes changes to code, make sure to add a changeset, by writing a file to refer to the manpage for more information. |
livekit-ffi/protocol/metrics.proto
Outdated
There was a problem hiding this comment.
Let's not copy this file, it is going to be very hard to keep it up to date since it's a copy (we will also have to release a new FFI versions each time we add a metric).
Instead, let's build the livekit metrics protocol on the Python side and serialize it before sending it to the FFI. The FFI is just going to be a passthrough for sending it over the DC.
You can take this example on how you can send raw bytes from Python:
rust-sdks/livekit-ffi/protocol/room.proto
Line 100 in 0773bce
There was a problem hiding this comment.
On Python, it can simply be generated for https://github.com/livekit/python-sdks/tree/main/livekit-protocol
On Node, we can reuse: https://github.com/livekit/protocol/tree/main/packages/javascript
There was a problem hiding this comment.
Updated, please check.
livekit-ffi/protocol/ffi.proto
Outdated
| required uint64 local_participant_handle = 1; | ||
| repeated string str_data = 2; | ||
| repeated TimeSeriesMetric time_series = 3; | ||
| repeated EventMetric events = 4; | ||
| optional uint64 async_id = 5; |
There was a problem hiding this comment.
This will then become
| required uint64 local_participant_handle = 1; | |
| repeated string str_data = 2; | |
| repeated TimeSeriesMetric time_series = 3; | |
| repeated EventMetric events = 4; | |
| optional uint64 async_id = 5; | |
| required uint64 local_participant_handle = 1; | |
| required uint64 data_ptr = 2; | |
| required uint64 data_len = 3; | |
| optional uint64 async_id = 4; |
livekit/src/room/metrics/mod.rs
Outdated
| ..Default::default() | ||
| }; | ||
|
|
||
| let _ = rtc_engine.publish_data(data_packet, DataPacketKind::Reliable).await; |
There was a problem hiding this comment.
Don't ignore errors, this function should return a Result
There was a problem hiding this comment.
Handled, please check again
livekit/src/room/metrics/mod.rs
Outdated
| ) { | ||
| cancellation_token | ||
| .run_until_cancelled(async move { | ||
| std::thread::sleep(Duration::from_secs(1)); |
There was a problem hiding this comment.
This is blocking the event loop, use a tokio sleep instead or an async interval instead
livekit/src/room/mod.rs
Outdated
| #[derive(Clone)] | ||
| pub struct Room { | ||
| inner: Arc<RoomSession>, | ||
| cancellation_token: tokio_util::sync::CancellationToken |
There was a problem hiding this comment.
I think we can avoid the cancellation token. This seems unnecessary.
We can just keep track of the handle returned by tokio::spawn and await it on close.
To "close/cancel" the task, you can use a tokio::select pattern
There was a problem hiding this comment.
Made it better, removed cancellation token, please have another look.
livekit/src/room/metrics/mod.rs
Outdated
| &self, | ||
| room: &Room, | ||
| rtc_engine: &RtcEngine, | ||
| cancellation_token: CancellationToken, |
There was a problem hiding this comment.
The cancellation can be done beforehand directly in the task inside Room using tokio::select!
This function should be cancel safe:
https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
livekit/src/room/metrics/mod.rs
Outdated
| room.local_participant().identity().into(), | ||
| ); | ||
|
|
||
| let _ = self.send_metrics(rtc_engine, strings, subscriber_ts_metrics).await; |
There was a problem hiding this comment.
Same here, let's return a Result
| fn create_time_series( | ||
| &self, | ||
| label: MetricLabel, | ||
| strings: &mut Vec<String>, |
There was a problem hiding this comment.
Any reason why this need to be a mutable reference? It seems like the "caller" doesn't expect it? So maybe it's fine to clone inside this method and keep the modifications in this scope
There was a problem hiding this comment.
Yes actually:
- Strings are modified here:
rust-sdks/livekit/src/room/metrics/mod.rs
Line 54 in fd33fe8
- but are created here:
rust-sdks/livekit/src/room/metrics/mod.rs
Line 230 in fd33fe8
livekit/src/room/metrics/mod.rs
Outdated
| room.local_participant().identity().into(), | ||
| ); | ||
|
|
||
| let _ = self.send_metrics(rtc_engine, strings, publisher_ts_metrics).await; |
There was a problem hiding this comment.
Same here, let's return a Result
livekit-ffi/generate_proto.sh
Outdated
| -I=$PROTOCOL \ | ||
| --prost_out=$OUT_RUST \ | ||
| --prost_opt=compile_well_known_types \ | ||
| --prost_opt=extern_path=.google.protobuf=::pbjson_types \ |
There was a problem hiding this comment.
Do we need json types? Seems like we never serialize to json?
There was a problem hiding this comment.
Not anymore, livekit-protocol was converting .google.protobuf types to pbjson_types but livekit-ffi was not doing so, it was messing up timestamps in metrics.proto. Since we have removed metrics proto from ffi, we don't need this anymore.
|
@anunaym14, is this ready to merge? |
| let handle = server.async_runtime.spawn(async move { | ||
| if let Err(err) = self_clone.data_tx.send(FfiDataPacket { | ||
| payload: DataPacket { | ||
| payload: data.to_vec(), // Avoid copy? |
There was a problem hiding this comment.
assuming this PR is still active, can we address the comment to avoid the copy here ?
| let data = unsafe { | ||
| slice::from_raw_parts(publish.data_ptr as *const u8, publish.data_len as usize) | ||
| } | ||
| .to_vec(); |
There was a problem hiding this comment.
does it copy the data here ?
No description provided.