Open3D (C++ API)  0.16.1
PeerConnectionManager.h
Go to the documentation of this file.
1// ----------------------------------------------------------------------------
2// - Open3D: www.open3d.org -
3// ----------------------------------------------------------------------------
4// The MIT License (MIT)
5//
6// Copyright (c) 2018-2021 www.open3d.org
7//
8// Permission is hereby granted, free of charge, to any person obtaining a copy
9// of this software and associated documentation files (the "Software"), to deal
10// in the Software without restriction, including without limitation the rights
11// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12// copies of the Software, and to permit persons to whom the Software is
13// furnished to do so, subject to the following conditions:
14//
15// The above copyright notice and this permission notice shall be included in
16// all copies or substantial portions of the Software.
17//
18// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
24// IN THE SOFTWARE.
25// ----------------------------------------------------------------------------
26// ----------------------------------------------------------------------------
27// Contains source code from
28// https://github.com/mpromonet/webrtc-streamer
29//
30// This software is in the public domain, furnished "as is", without technical
31// support, and with no warranty, express or implied, as to its usefulness for
32// any purpose.
33// ----------------------------------------------------------------------------
34//
35// This is a private header. It shall be hidden from Open3D's public API. Do not
36// put this in Open3D.h.in.
37
38#pragma once
39
40#include <api/peer_connection_interface.h>
41#include <rtc_base/strings/json.h>
42
43#include <future>
44#include <mutex>
45#include <regex>
46#include <string>
47#include <thread>
48#include <unordered_map>
49
53
54namespace open3d {
55namespace visualization {
56namespace webrtc_server {
57
93 class VideoSink : public rtc::VideoSinkInterface<webrtc::VideoFrame> {
94 public:
95 VideoSink(webrtc::VideoTrackInterface* track) : track_(track) {
96 track_->AddOrUpdateSink(this, rtc::VideoSinkWants());
97 }
98 virtual ~VideoSink() { track_->RemoveSink(this); }
99
100 // VideoSinkInterface implementation
101 virtual void OnFrame(const webrtc::VideoFrame& video_frame) {
102 rtc::scoped_refptr<webrtc::I420BufferInterface> buffer(
103 video_frame.video_frame_buffer()->ToI420());
104 utility::LogDebug("[{}] frame: {}x{}", OPEN3D_FUNCTION,
105 buffer->height(), buffer->width());
106 }
107
108 protected:
109 rtc::scoped_refptr<webrtc::VideoTrackInterface> track_;
110 };
111
112 class SetSessionDescriptionObserver
113 : public webrtc::SetSessionDescriptionObserver {
114 public:
115 static SetSessionDescriptionObserver* Create(
116 webrtc::PeerConnectionInterface* pc,
117 std::promise<const webrtc::SessionDescriptionInterface*>&
118 promise) {
119 return new rtc::RefCountedObject<SetSessionDescriptionObserver>(
120 pc, promise);
121 }
122 virtual void OnSuccess() {
123 std::string sdp;
124 if (pc_->local_description()) {
125 promise_.set_value(pc_->local_description());
126 pc_->local_description()->ToString(&sdp);
127 } else if (pc_->remote_description()) {
128 promise_.set_value(pc_->remote_description());
129 pc_->remote_description()->ToString(&sdp);
130 }
131 }
132 virtual void OnFailure(webrtc::RTCError error) {
133 utility::LogWarning("{}", error.message());
134 promise_.set_value(nullptr);
135 }
136
137 protected:
138 SetSessionDescriptionObserver(
139 webrtc::PeerConnectionInterface* pc,
140 std::promise<const webrtc::SessionDescriptionInterface*>&
141 promise)
142 : pc_(pc), promise_(promise){};
143
144 private:
145 webrtc::PeerConnectionInterface* pc_;
146 std::promise<const webrtc::SessionDescriptionInterface*>& promise_;
147 };
148
149 class CreateSessionDescriptionObserver
150 : public webrtc::CreateSessionDescriptionObserver {
151 public:
152 static CreateSessionDescriptionObserver* Create(
153 webrtc::PeerConnectionInterface* pc,
154 std::promise<const webrtc::SessionDescriptionInterface*>&
155 promise) {
156 return new rtc::RefCountedObject<CreateSessionDescriptionObserver>(
157 pc, promise);
158 }
159 virtual void OnSuccess(webrtc::SessionDescriptionInterface* desc) {
160 std::string sdp;
161 desc->ToString(&sdp);
162 pc_->SetLocalDescription(
163 SetSessionDescriptionObserver::Create(pc_, promise_), desc);
164 }
165 virtual void OnFailure(webrtc::RTCError error) {
166 utility::LogWarning("{}", error.message());
167 promise_.set_value(nullptr);
168 }
169
170 protected:
171 CreateSessionDescriptionObserver(
172 webrtc::PeerConnectionInterface* pc,
173 std::promise<const webrtc::SessionDescriptionInterface*>&
174 promise)
175 : pc_(pc), promise_(promise){};
176
177 private:
178 webrtc::PeerConnectionInterface* pc_;
179 std::promise<const webrtc::SessionDescriptionInterface*>& promise_;
180 };
181
182 class PeerConnectionStatsCollectorCallback
183 : public webrtc::RTCStatsCollectorCallback {
184 public:
185 PeerConnectionStatsCollectorCallback() {}
186 void clearReport() { report_.clear(); }
187 Json::Value getReport() { return report_; }
188
189 protected:
190 virtual void OnStatsDelivered(
191 const rtc::scoped_refptr<const webrtc::RTCStatsReport>&
192 report) {
193 for (const webrtc::RTCStats& stats : *report) {
194 Json::Value stats_members;
195 for (const webrtc::RTCStatsMemberInterface* member :
196 stats.Members()) {
197 stats_members[member->name()] = member->ValueToString();
198 }
199 report_[stats.id()] = stats_members;
200 }
201 }
202
203 Json::Value report_;
204 };
205
206 class DataChannelObserver : public webrtc::DataChannelObserver {
207 public:
208 DataChannelObserver(
209 PeerConnectionManager* peer_connection_manager,
210 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
211 const std::string& peerid)
212 : peer_connection_manager_(peer_connection_manager),
213 data_channel_(data_channel),
214 peerid_(peerid) {
215 data_channel_->RegisterObserver(this);
216 }
217 virtual ~DataChannelObserver() { data_channel_->UnregisterObserver(); }
218
219 // DataChannelObserver interface
220 virtual void OnStateChange() {
221 // Useful to know when the data channel is established.
222 const std::string label = data_channel_->label();
223 const std::string state =
224 webrtc::DataChannelInterface::DataStateString(
225 data_channel_->state());
227 "DataChannelObserver::OnStateChange label: {}, state: {}, "
228 "peerid: {}",
229 label, state, peerid_);
230 std::string msg(label + " " + state);
231 webrtc::DataBuffer buffer(msg);
232 data_channel_->Send(buffer);
233 // ClientDataChannel is established after ServerDataChannel. Once
234 // ClientDataChannel is established, we need to send initial frames
235 // to the client such that the video is not empty. Afterwards,
236 // video frames will only be sent when the GUI redraws.
237 if (label == "ClientDataChannel" && state == "open") {
238 {
239 std::lock_guard<std::mutex> mutex_lock(
240 peer_connection_manager_
242 peer_connection_manager_->peerid_data_channel_ready_.insert(
243 peerid_);
244 }
245 peer_connection_manager_->SendInitFramesToPeer(peerid_);
246 }
247 if (label == "ClientDataChannel" &&
248 (state == "closed" || state == "closing")) {
249 std::lock_guard<std::mutex> mutex_lock(
250 peer_connection_manager_->peerid_data_channel_mutex_);
251 peer_connection_manager_->peerid_data_channel_ready_.erase(
252 peerid_);
253 }
254 }
255 virtual void OnMessage(const webrtc::DataBuffer& buffer) {
256 std::string msg((const char*)buffer.data.data(),
257 buffer.data.size());
258 utility::LogDebug("DataChannelObserver::OnMessage: {}, msg: {}.",
259 data_channel_->label(), msg);
260 std::string reply =
261 WebRTCWindowSystem::GetInstance()->OnDataChannelMessage(
262 msg);
263 if (!reply.empty()) {
264 webrtc::DataBuffer buffer(reply);
265 data_channel_->Send(buffer);
266 }
267 }
268
269 protected:
270 PeerConnectionManager* peer_connection_manager_;
271 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
272 const std::string peerid_;
273 };
274
275 class PeerConnectionObserver : public webrtc::PeerConnectionObserver {
276 public:
277 PeerConnectionObserver(
278 PeerConnectionManager* peer_connection_manager,
279 const std::string& peerid,
280 const webrtc::PeerConnectionInterface::RTCConfiguration& config,
281 std::unique_ptr<cricket::PortAllocator> port_allocator)
282 : peer_connection_manager_(peer_connection_manager),
283 peerid_(peerid),
284 local_channel_(nullptr),
285 remote_channel_(nullptr),
286 ice_candidate_list_(Json::arrayValue),
287 deleting_(false) {
288 pc_ = peer_connection_manager_->peer_connection_factory_
289 ->CreatePeerConnection(config,
290 std::move(port_allocator),
291 nullptr, this);
292
293 if (pc_.get()) {
294 rtc::scoped_refptr<webrtc::DataChannelInterface> channel =
295 pc_->CreateDataChannel("ServerDataChannel", nullptr);
296 local_channel_ = new DataChannelObserver(
297 peer_connection_manager_, channel, peerid_);
298 }
299
300 stats_callback_ = new rtc::RefCountedObject<
301 PeerConnectionStatsCollectorCallback>();
302 };
303
304 virtual ~PeerConnectionObserver() {
305 delete local_channel_;
306 delete remote_channel_;
307 if (pc_.get()) {
308 // warning: pc->close call OnIceConnectionChange
309 deleting_ = true;
310 pc_->Close();
311 }
312 }
313
314 Json::Value GetIceCandidateList() { return ice_candidate_list_; }
315
316 Json::Value GetStats() {
317 stats_callback_->clearReport();
318 pc_->GetStats(stats_callback_);
319 int count = 10;
320 while ((stats_callback_->getReport().empty()) && (--count > 0)) {
321 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
322 }
323 return Json::Value(stats_callback_->getReport());
324 };
325
326 rtc::scoped_refptr<webrtc::PeerConnectionInterface>
328 return pc_;
329 };
330
331 // PeerConnectionObserver interface
332 virtual void OnAddStream(
333 rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
334 utility::LogDebug("[{}] GetVideoTracks().size(): {}.",
335 OPEN3D_FUNCTION, stream->GetVideoTracks().size());
336 webrtc::VideoTrackVector videoTracks = stream->GetVideoTracks();
337 if (videoTracks.size() > 0) {
338 video_sink_.reset(new VideoSink(videoTracks.at(0)));
339 }
340 }
341 virtual void OnRemoveStream(
342 rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
343 video_sink_.reset();
344 }
345 virtual void OnDataChannel(
346 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
348 "PeerConnectionObserver::OnDataChannel peerid: {}",
349 peerid_);
350 remote_channel_ = new DataChannelObserver(peer_connection_manager_,
351 channel, peerid_);
352 }
353 virtual void OnRenegotiationNeeded() {
354 std::lock_guard<std::mutex> mutex_lock(
355 peer_connection_manager_->peerid_data_channel_mutex_);
356 peer_connection_manager_->peerid_data_channel_ready_.erase(peerid_);
358 "PeerConnectionObserver::OnRenegotiationNeeded peerid: {}",
359 peerid_);
360 }
361 virtual void OnIceCandidate(
362 const webrtc::IceCandidateInterface* candidate);
363
364 virtual void OnSignalingChange(
365 webrtc::PeerConnectionInterface::SignalingState state) {
366 utility::LogDebug("state: {}, peerid: {}", state, peerid_);
367 }
368 virtual void OnIceConnectionChange(
369 webrtc::PeerConnectionInterface::IceConnectionState state) {
370 if ((state ==
371 webrtc::PeerConnectionInterface::kIceConnectionFailed) ||
372 (state ==
373 webrtc::PeerConnectionInterface::kIceConnectionClosed)) {
374 ice_candidate_list_.clear();
375 if (!deleting_) {
376 std::thread([this]() {
377 peer_connection_manager_->HangUp(peerid_);
378 }).detach();
379 }
380 }
381 }
382
383 virtual void OnIceGatheringChange(
384 webrtc::PeerConnectionInterface::IceGatheringState) {}
385
386 private:
387 PeerConnectionManager* peer_connection_manager_;
388 const std::string peerid_;
389 rtc::scoped_refptr<webrtc::PeerConnectionInterface> pc_;
390 DataChannelObserver* local_channel_;
391 DataChannelObserver* remote_channel_;
392 Json::Value ice_candidate_list_;
393 rtc::scoped_refptr<PeerConnectionStatsCollectorCallback>
394 stats_callback_;
395 std::unique_ptr<VideoSink> video_sink_;
396 bool deleting_;
397 };
398
399public:
400 PeerConnectionManager(const std::list<std::string>& ice_server_list,
401 const Json::Value& config,
402 const std::string& publish_filter,
403 const std::string& webrtc_udp_port_range);
404 virtual ~PeerConnectionManager();
405
407 const std::map<std::string, HttpServerRequestHandler::HttpFunction>
408 GetHttpApi();
409
410 const Json::Value GetIceCandidateList(const std::string& peerid);
411 const Json::Value AddIceCandidate(const std::string& peerid,
412 const Json::Value& json_message);
413 const Json::Value GetMediaList();
414 const Json::Value HangUp(const std::string& peerid);
415 const Json::Value Call(const std::string& peerid,
416 const std::string& window_uid,
417 const std::string& options,
418 const Json::Value& json_message);
419 const Json::Value GetIceServers();
420
421 void SendInitFramesToPeer(const std::string& peerid);
422
423 void CloseWindowConnections(const std::string& window_uid);
424
425 void OnFrame(const std::string& window_uid,
426 const std::shared_ptr<core::Tensor>& im);
427
428protected:
429 rtc::scoped_refptr<BitmapTrackSourceInterface> GetVideoTrackSource(
430 const std::string& window_uid);
431 PeerConnectionObserver* CreatePeerConnection(const std::string& peerid);
432 bool AddStreams(webrtc::PeerConnectionInterface* peer_connection,
433 const std::string& window_uid,
434 const std::string& options);
435 rtc::scoped_refptr<BitmapTrackSourceInterface> CreateVideoSource(
436 const std::string& window_uid,
437 const std::map<std::string, std::string>& opts);
438 bool WindowStillUsed(const std::string& window_uid);
439 rtc::scoped_refptr<webrtc::PeerConnectionInterface> GetPeerConnection(
440 const std::string& peerid);
441
442protected:
443 std::unique_ptr<webrtc::TaskQueueFactory> task_queue_factory_;
444 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
446
447 // Each peer has exactly one connection.
448 std::unordered_map<std::string, PeerConnectionObserver*>
451 // Set of peerids with data channel ready for communication
452 std::unordered_set<std::string> peerid_data_channel_ready_;
454
455 // Each Window has exactly one TrackSource.
456 std::unordered_map<std::string,
457 rtc::scoped_refptr<BitmapTrackSourceInterface>>
460
461 // Each Window can be connected to zero, one or more peers.
462 std::unordered_map<std::string, std::set<std::string>>
464 std::unordered_map<std::string, std::string> peerid_to_window_uid_;
465 // Shared by window_uid_to_peerids_ and peerid_to_window_uid_.
467
468 std::list<std::string> ice_server_list_;
469 const Json::Value config_;
470 const std::regex publish_filter_;
471 std::map<std::string, HttpServerRequestHandler::HttpFunction> func_;
473};
474
475} // namespace webrtc_server
476} // namespace visualization
477} // namespace open3d
#define LogWarning(...)
Definition: Logging.h:79
#define LogInfo(...)
Definition: Logging.h:89
#define LogDebug(...)
Definition: Logging.h:98
#define OPEN3D_FUNCTION
Definition: Macro.h:59
PeerConnectionManager(const std::list< std::string > &ice_server_list, const Json::Value &config, const std::string &publish_filter, const std::string &webrtc_udp_port_range)
Definition: PeerConnectionManager.cpp:145
const Json::Value AddIceCandidate(const std::string &peerid, const Json::Value &json_message)
Definition: PeerConnectionManager.cpp:270
std::mutex peerid_data_channel_mutex_
Definition: PeerConnectionManager.h:453
PeerConnectionObserver * CreatePeerConnection(const std::string &peerid)
Definition: PeerConnectionManager.cpp:541
virtual ~PeerConnectionManager()
Definition: PeerConnectionManager.cpp:219
std::mutex window_uid_to_peerids_mutex_
Definition: PeerConnectionManager.h:466
rtc::scoped_refptr< BitmapTrackSourceInterface > CreateVideoSource(const std::string &window_uid, const std::map< std::string, std::string > &opts)
Definition: PeerConnectionManager.cpp:582
bool WindowStillUsed(const std::string &window_uid)
Definition: PeerConnectionManager.cpp:436
std::mutex peerid_to_connection_mutex_
Definition: PeerConnectionManager.h:450
const Json::Value Call(const std::string &peerid, const std::string &window_uid, const std::string &options, const Json::Value &json_message)
Definition: PeerConnectionManager.cpp:324
std::unique_ptr< webrtc::TaskQueueFactory > task_queue_factory_
Definition: PeerConnectionManager.h:443
std::unordered_map< std::string, std::set< std::string > > window_uid_to_peerids_
Definition: PeerConnectionManager.h:463
bool InitializePeerConnection()
Definition: PeerConnectionManager.cpp:535
void SendInitFramesToPeer(const std::string &peerid)
Definition: PeerConnectionManager.cpp:727
std::unordered_map< std::string, std::string > peerid_to_window_uid_
Definition: PeerConnectionManager.h:464
const Json::Value GetIceCandidateList(const std::string &peerid)
Definition: PeerConnectionManager.cpp:518
const Json::Value HangUp(const std::string &peerid)
Definition: PeerConnectionManager.cpp:454
const Json::Value GetMediaList()
Definition: PeerConnectionManager.cpp:222
void OnFrame(const std::string &window_uid, const std::shared_ptr< core::Tensor > &im)
Definition: PeerConnectionManager.cpp:751
std::list< std::string > ice_server_list_
Definition: PeerConnectionManager.h:468
const Json::Value GetIceServers()
Definition: PeerConnectionManager.cpp:236
const std::map< std::string, HttpServerRequestHandler::HttpFunction > GetHttpApi()
Definition: PeerConnectionManager.cpp:513
std::map< std::string, HttpServerRequestHandler::HttpFunction > func_
Definition: PeerConnectionManager.h:471
std::string webrtc_port_range_
Definition: PeerConnectionManager.h:472
std::unordered_map< std::string, PeerConnectionObserver * > peerid_to_connection_
Definition: PeerConnectionManager.h:449
void CloseWindowConnections(const std::string &window_uid)
Definition: PeerConnectionManager.cpp:733
rtc::scoped_refptr< webrtc::PeerConnectionInterface > GetPeerConnection(const std::string &peerid)
Definition: PeerConnectionManager.cpp:260
std::unordered_set< std::string > peerid_data_channel_ready_
Definition: PeerConnectionManager.h:452
const std::regex publish_filter_
Definition: PeerConnectionManager.h:470
rtc::scoped_refptr< BitmapTrackSourceInterface > GetVideoTrackSource(const std::string &window_uid)
Definition: PeerConnectionManager.cpp:715
bool AddStreams(webrtc::PeerConnectionInterface *peer_connection, const std::string &window_uid, const std::string &options)
Definition: PeerConnectionManager.cpp:594
std::mutex window_uid_to_track_source_mutex_
Definition: PeerConnectionManager.h:459
const Json::Value config_
Definition: PeerConnectionManager.h:469
std::unordered_map< std::string, rtc::scoped_refptr< BitmapTrackSourceInterface > > window_uid_to_track_source_
Definition: PeerConnectionManager.h:458
rtc::scoped_refptr< webrtc::PeerConnectionFactoryInterface > peer_connection_factory_
Definition: PeerConnectionManager.h:445
static std::shared_ptr< WebRTCWindowSystem > GetInstance()
Definition: WebRTCWindowSystem.cpp:130
int count
Definition: FilePCD.cpp:61
Definition: PinholeCameraIntrinsic.cpp:35