ice_transport: fix regression on pending when bumping to pj 2.10

With these changes, Jami only got on_data_sent for data that Jami
send. In previous state, some on_data_sent comes from TURN refresh
because it's managed by the ice_session in pjsip.

Change-Id: I76109f9662aeca01ffccd884dac2a5aab7ab2503
This commit is contained in:
Sébastien Blin
2020-04-27 15:54:51 -04:00
parent d931f140f1
commit 7255f3bb07
2 changed files with 78 additions and 49 deletions

View File

@ -19,6 +19,7 @@ Rebased for pjsip 2.10 by Peymane Marandi
<paymon@savoirfairelinux.com>
on behalf of Savoir-faire Linux.
---
pjnath/include/pjnath/ice_session.h | 173 +++-
pjnath/include/pjnath/ice_strans.h | 21 +
pjnath/include/pjnath/stun_session.h | 75 +-
@ -28,7 +29,7 @@ on behalf of Savoir-faire Linux.
pjnath/src/pjnath-test/sess_auth.c | 14 +-
pjnath/src/pjnath-test/stun_sock_test.c | 7 +-
pjnath/src/pjnath/ice_session.c | 515 ++++++++++-
pjnath/src/pjnath/ice_strans.c | 759 +++++++++++++---
pjnath/src/pjnath/ice_strans.c | 771 +++++++++++++---
pjnath/src/pjnath/nat_detect.c | 7 +-
pjnath/src/pjnath/stun_session.c | 15 +-
pjnath/src/pjnath/stun_sock.c | 1074 +++++++++++++++++++----
@ -40,7 +41,7 @@ on behalf of Savoir-faire Linux.
pjnath/src/pjturn-srv/server.c | 2 +-
pjsip-apps/src/samples/icedemo.c | 116 ++-
pjsip/src/pjsua-lib/pjsua_core.c | 2 +-
21 files changed, 2498 insertions(+), 400 deletions(-)
21 files changed, 2508 insertions(+), 402 deletions(-)
diff --git a/pjnath/include/pjnath/ice_session.h b/pjnath/include/pjnath/ice_session.h
index 8971220f0..39c197c29 100644
@ -1387,7 +1388,7 @@ index 2a4125bc5..d2eae9494 100644
}
}
diff --git a/pjnath/src/pjnath/ice_strans.c b/pjnath/src/pjnath/ice_strans.c
index 3cb350c2a..40ae403d0 100644
index 3cb350c2a..dee666e2a 100644
--- a/pjnath/src/pjnath/ice_strans.c
+++ b/pjnath/src/pjnath/ice_strans.c
@@ -69,6 +69,7 @@ enum tp_type
@ -1455,7 +1456,7 @@ index 3cb350c2a..40ae403d0 100644
/* Pending send buffer */
typedef struct pending_send
{
@@ -225,6 +262,12 @@ struct pj_ice_strans
@@ -225,6 +262,13 @@ struct pj_ice_strans
pj_bool_t destroy_req;/**< Destroy has been called? */
pj_bool_t cb_called; /**< Init error callback called?*/
pj_bool_t call_send_cb;/**< Need to call send cb? */
@ -1465,10 +1466,11 @@ index 3cb350c2a..40ae403d0 100644
+ pj_uint16_t rx_buffer_size;
+ pj_uint16_t rx_wanted_size;
+
+ unsigned last_data_len; /**< What the application is waiting. */
};
@@ -261,6 +304,7 @@ PJ_DEF(void) pj_ice_strans_cfg_default(pj_ice_strans_cfg *cfg)
@@ -261,6 +305,7 @@ PJ_DEF(void) pj_ice_strans_cfg_default(pj_ice_strans_cfg *cfg)
pj_bzero(cfg, sizeof(*cfg));
cfg->af = pj_AF_INET();
@ -1476,7 +1478,7 @@ index 3cb350c2a..40ae403d0 100644
pj_stun_config_init(&cfg->stun_cfg, NULL, 0, NULL, NULL);
pj_ice_strans_stun_cfg_default(&cfg->stun);
pj_ice_strans_turn_cfg_default(&cfg->turn);
@@ -278,6 +322,7 @@ PJ_DEF(void) pj_ice_strans_stun_cfg_default(pj_ice_strans_stun_cfg *cfg)
@@ -278,6 +323,7 @@ PJ_DEF(void) pj_ice_strans_stun_cfg_default(pj_ice_strans_stun_cfg *cfg)
pj_bzero(cfg, sizeof(*cfg));
cfg->af = pj_AF_INET();
@ -1484,7 +1486,7 @@ index 3cb350c2a..40ae403d0 100644
cfg->port = PJ_STUN_PORT;
cfg->max_host_cands = 64;
cfg->ignore_stun_error = PJ_FALSE;
@@ -421,6 +466,9 @@ static pj_status_t add_update_turn(pj_ice_strans *ice_st,
@@ -421,6 +467,9 @@ static pj_status_t add_update_turn(pj_ice_strans *ice_st,
cand->transport_id = tp_id;
cand->comp_id = (pj_uint8_t) comp->comp_id;
new_cand = PJ_TRUE;
@ -1494,7 +1496,7 @@ index 3cb350c2a..40ae403d0 100644
}
/* Allocate and initialize TURN socket data */
@@ -428,6 +476,10 @@ static pj_status_t add_update_turn(pj_ice_strans *ice_st,
@@ -428,6 +477,10 @@ static pj_status_t add_update_turn(pj_ice_strans *ice_st,
data->comp = comp;
data->transport_id = cand->transport_id;
@ -1505,7 +1507,7 @@ index 3cb350c2a..40ae403d0 100644
/* Create the TURN transport */
status = pj_turn_sock_create(&ice_st->cfg.stun_cfg, turn_cfg->af,
turn_cfg->conn_type,
@@ -465,7 +517,7 @@ static pj_status_t add_update_turn(pj_ice_strans *ice_st,
@@ -465,7 +518,7 @@ static pj_status_t add_update_turn(pj_ice_strans *ice_st,
return PJ_SUCCESS;
}
@ -1514,7 +1516,7 @@ index 3cb350c2a..40ae403d0 100644
pj_ice_sess_cand *rcand)
{
if (lcand == NULL && rcand == NULL){
@@ -474,23 +526,23 @@ static pj_bool_t ice_cand_equals(pj_ice_sess_cand *lcand,
@@ -474,23 +527,23 @@ static pj_bool_t ice_cand_equals(pj_ice_sess_cand *lcand,
if (lcand == NULL || rcand == NULL){
return PJ_FALSE;
}
@ -1541,7 +1543,7 @@ index 3cb350c2a..40ae403d0 100644
static pj_status_t add_stun_and_host(pj_ice_strans *ice_st,
pj_ice_strans_comp *comp,
unsigned idx,
@@ -541,6 +593,9 @@ static pj_status_t add_stun_and_host(pj_ice_strans *ice_st,
@@ -541,6 +594,9 @@ static pj_status_t add_stun_and_host(pj_ice_strans *ice_st,
cand->local_pref = SRFLX_PREF;
cand->transport_id = CREATE_TP_ID(TP_STUN, idx);
cand->comp_id = (pj_uint8_t) comp->comp_id;
@ -1551,7 +1553,7 @@ index 3cb350c2a..40ae403d0 100644
/* Allocate and initialize STUN socket data */
data = PJ_POOL_ZALLOC_T(ice_st->pool, sock_user_data);
@@ -549,8 +604,9 @@ static pj_status_t add_stun_and_host(pj_ice_strans *ice_st,
@@ -549,8 +605,9 @@ static pj_status_t add_stun_and_host(pj_ice_strans *ice_st,
/* Create the STUN transport */
status = pj_stun_sock_create(&ice_st->cfg.stun_cfg, NULL,
@ -1563,7 +1565,7 @@ index 3cb350c2a..40ae403d0 100644
if (status != PJ_SUCCESS)
return status;
@@ -635,105 +691,154 @@ static pj_status_t add_stun_and_host(pj_ice_strans *ice_st,
@@ -635,105 +692,154 @@ static pj_status_t add_stun_and_host(pj_ice_strans *ice_st,
}
for (i = 0; i < stun_sock_info.alias_cnt &&
@ -1774,7 +1776,7 @@ index 3cb350c2a..40ae403d0 100644
+ (*cand_cnt)++;
+ (*max_cand_cnt)--;
+ }
+
+ pj_ice_calc_foundation(ice_st->pool, &cand->foundation,
+ cand->type, &cand->base_addr);
+
@ -1788,7 +1790,7 @@ index 3cb350c2a..40ae403d0 100644
+ {
+ comp->default_cand = (unsigned)(cand - comp->cand_list);
+ }
+
+ if (transport == PJ_CAND_TCP_ACTIVE) {
+ // Use the port 9 (DISCARD Protocol) for TCP active candidates.
+ pj_sockaddr_set_port(&cand->addr, 9);
@ -1805,7 +1807,7 @@ index 3cb350c2a..40ae403d0 100644
/*
* Create the component.
@@ -816,7 +921,7 @@ static pj_status_t alloc_send_buf(pj_ice_strans *ice_st, unsigned buf_size)
@@ -816,7 +922,7 @@ static pj_status_t alloc_send_buf(pj_ice_strans *ice_st, unsigned buf_size)
{
if (buf_size > ice_st->buf_size) {
unsigned i;
@ -1814,7 +1816,7 @@ index 3cb350c2a..40ae403d0 100644
if (ice_st->is_pending) {
/* The current buffer is insufficient, but still currently used.*/
return PJ_EBUSY;
@@ -839,7 +944,7 @@ static pj_status_t alloc_send_buf(pj_ice_strans *ice_st, unsigned buf_size)
@@ -839,7 +945,7 @@ static pj_status_t alloc_send_buf(pj_ice_strans *ice_st, unsigned buf_size)
}
ice_st->buf_idx = ice_st->empty_idx = 0;
}
@ -1823,7 +1825,7 @@ index 3cb350c2a..40ae403d0 100644
return PJ_SUCCESS;
}
@@ -906,7 +1011,7 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name,
@@ -906,7 +1012,7 @@ PJ_DEF(pj_status_t) pj_ice_strans_create( const char *name,
/* To maintain backward compatibility, check if old/deprecated setting is set
* and the new setting is not, copy the value to the new setting.
*/
@ -1832,7 +1834,7 @@ index 3cb350c2a..40ae403d0 100644
(cfg->stun.server.slen || cfg->stun.max_host_cands))
{
ice_st->cfg.stun_tp_cnt = 1;
@@ -1105,7 +1210,7 @@ static void sess_init_update(pj_ice_strans *ice_st)
@@ -1105,7 +1211,7 @@ static void sess_init_update(pj_ice_strans *ice_st)
pj_ice_get_cand_type_name(cand->type)));
return;
}
@ -1841,7 +1843,7 @@ index 3cb350c2a..40ae403d0 100644
if (status == PJ_EUNKNOWN) {
status = cand->status;
} else {
@@ -1114,7 +1219,7 @@ static void sess_init_update(pj_ice_strans *ice_st)
@@ -1114,7 +1220,7 @@ static void sess_init_update(pj_ice_strans *ice_st)
status = PJ_SUCCESS;
}
}
@ -1850,7 +1852,7 @@ index 3cb350c2a..40ae403d0 100644
if (status != PJ_SUCCESS)
break;
}
@@ -1207,6 +1312,12 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st,
@@ -1207,6 +1313,12 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st,
ice_cb.on_ice_complete = &on_ice_complete;
ice_cb.on_rx_data = &ice_rx_data;
ice_cb.on_tx_pkt = &ice_tx_pkt;
@ -1863,7 +1865,7 @@ index 3cb350c2a..40ae403d0 100644
/* Create! */
status = pj_ice_sess_create(&ice_st->cfg.stun_cfg, ice_st->obj_name, role,
@@ -1282,7 +1393,8 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st,
@@ -1282,7 +1394,8 @@ PJ_DEF(pj_status_t) pj_ice_strans_init_ice(pj_ice_strans *ice_st,
&cand->foundation, &cand->addr,
&cand->base_addr, &cand->rel_addr,
pj_sockaddr_get_len(&cand->addr),
@ -1873,7 +1875,7 @@ index 3cb350c2a..40ae403d0 100644
if (status != PJ_SUCCESS)
goto on_error;
}
@@ -1544,7 +1656,7 @@ pj_ice_strans_get_valid_pair(const pj_ice_strans *ice_st,
@@ -1544,7 +1657,7 @@ pj_ice_strans_get_valid_pair(const pj_ice_strans *ice_st,
PJ_DEF(pj_status_t) pj_ice_strans_stop_ice(pj_ice_strans *ice_st)
{
PJ_ASSERT_RETURN(ice_st, PJ_EINVAL);
@ -1882,7 +1884,7 @@ index 3cb350c2a..40ae403d0 100644
/* Protect with group lock, since this may cause race condition with
* pj_ice_strans_sendto2().
* See ticket #1877.
@@ -1578,7 +1690,7 @@ static pj_status_t use_buffer( pj_ice_strans *ice_st,
@@ -1578,7 +1691,7 @@ static pj_status_t use_buffer( pj_ice_strans *ice_st,
status = alloc_send_buf(ice_st, data_len);
if (status != PJ_SUCCESS)
return status;
@ -1891,7 +1893,7 @@ index 3cb350c2a..40ae403d0 100644
if (ice_st->is_pending && ice_st->empty_idx == ice_st->buf_idx) {
/* We don't use buffer or there's no more empty buffer. */
return PJ_EBUSY;
@@ -1593,12 +1705,12 @@ static pj_status_t use_buffer( pj_ice_strans *ice_st,
@@ -1593,12 +1706,12 @@ static pj_status_t use_buffer( pj_ice_strans *ice_st,
pj_sockaddr_cp(&ice_st->send_buf[idx].dst_addr, dst_addr);
ice_st->send_buf[idx].dst_addr_len = dst_addr_len;
*buffer = ice_st->send_buf[idx].buffer;
@ -1906,7 +1908,7 @@ index 3cb350c2a..40ae403d0 100644
ice_st->is_pending = PJ_TRUE;
ice_st->buf_idx = idx;
@@ -1659,16 +1771,40 @@ static pj_status_t send_data(pj_ice_strans *ice_st,
@@ -1659,16 +1772,40 @@ static pj_status_t send_data(pj_ice_strans *ice_st,
*/
if (ice_st->ice && ice_st->state == PJ_ICE_STRANS_STATE_RUNNING) {
status = pj_ice_sess_send_data(ice_st->ice, comp_id, buf, data_len);
@ -1951,7 +1953,7 @@ index 3cb350c2a..40ae403d0 100644
if (def_cand->status == PJ_SUCCESS) {
unsigned tp_idx = GET_TP_IDX(def_cand->transport_id);
@@ -1730,6 +1866,11 @@ static pj_status_t send_data(pj_ice_strans *ice_st,
@@ -1730,6 +1867,11 @@ static pj_status_t send_data(pj_ice_strans *ice_st,
status = pj_stun_sock_sendto(comp->stun[tp_idx].sock, NULL, buf,
(unsigned)data_len, 0, dest_addr,
dest_addr_len);
@ -1963,7 +1965,23 @@ index 3cb350c2a..40ae403d0 100644
goto on_return;
}
@@ -1771,7 +1912,7 @@ PJ_DEF(pj_status_t) pj_ice_strans_sendto( pj_ice_strans *ice_st,
@@ -1738,8 +1880,14 @@ static pj_status_t send_data(pj_ice_strans *ice_st,
on_return:
/* We continue later in on_data_sent() callback. */
- if (status == PJ_EPENDING)
+ if (status == PJ_EPENDING) {
+ ice_st->last_data_len = data_len;
+ if (add_header) {
+ // Don't forget the header
+ ice_st->last_data_len += sizeof(pj_uint16_t);
+ }
return status;
+ }
if (call_cb) {
on_data_sent(ice_st, (status == PJ_SUCCESS? data_len: -status));
@@ -1771,7 +1919,7 @@ PJ_DEF(pj_status_t) pj_ice_strans_sendto( pj_ice_strans *ice_st,
dst_addr_len, PJ_TRUE, PJ_FALSE);
if (status == PJ_EPENDING)
status = PJ_SUCCESS;
@ -1972,7 +1990,7 @@ index 3cb350c2a..40ae403d0 100644
return status;
}
#endif
@@ -1842,7 +1983,22 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status)
@@ -1842,7 +1990,22 @@ static void on_ice_complete(pj_ice_sess *ice, pj_status_t status)
sizeof(lip), 3);
pj_sockaddr_print(&check->rcand->addr, rip,
sizeof(rip), 3);
@ -1996,7 +2014,7 @@ index 3cb350c2a..40ae403d0 100644
if (tp_typ == TP_TURN) {
/* Activate channel binding for the remote address
* for more efficient data transfer using TURN.
@@ -1936,6 +2092,29 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
@@ -1936,6 +2099,29 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
pj_sockaddr_get_port(dst_addr),
tp_typ));
@ -2026,7 +2044,7 @@ index 3cb350c2a..40ae403d0 100644
if (tp_typ == TP_TURN) {
if (comp->turn[tp_idx].sock) {
status = pj_turn_sock_sendto(comp->turn[tp_idx].sock,
@@ -1958,7 +2137,7 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
@@ -1958,7 +2144,7 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
if (status != PJ_SUCCESS) {
goto on_return;
}
@ -2035,7 +2053,7 @@ index 3cb350c2a..40ae403d0 100644
pj_sockaddr_cp(&comp->dst_addr, dst_addr);
comp->synth_addr_len = pj_sockaddr_get_len(&comp->synth_addr);
}
@@ -1969,9 +2148,13 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
@@ -1969,9 +2155,13 @@ static pj_status_t ice_tx_pkt(pj_ice_sess *ice,
dest_addr_len = dst_addr_len;
}
@ -2052,7 +2070,7 @@ index 3cb350c2a..40ae403d0 100644
} else {
pj_assert(!"Invalid transport ID");
status = PJ_EINVALIDOP;
@@ -2017,7 +2200,7 @@ static void check_pending_send(pj_ice_strans *ice_st)
@@ -2017,7 +2207,7 @@ static void check_pending_send(pj_ice_strans *ice_st)
if (ice_st->num_buf > 0)
ice_st->buf_idx = (ice_st->buf_idx + 1) % ice_st->num_buf;
@ -2061,7 +2079,7 @@ index 3cb350c2a..40ae403d0 100644
if (ice_st->num_buf > 0 && ice_st->buf_idx != ice_st->empty_idx) {
/* There's some pending send. Send it one by one. */
pending_send *ps = &ice_st->send_buf[ice_st->buf_idx];
@@ -2031,6 +2214,237 @@ static void check_pending_send(pj_ice_strans *ice_st)
@@ -2031,6 +2221,237 @@ static void check_pending_send(pj_ice_strans *ice_st)
}
}
@ -2299,7 +2317,17 @@ index 3cb350c2a..40ae403d0 100644
/* Notifification when asynchronous send operation via STUN/TURN
* has completed.
*/
@@ -2196,7 +2610,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
@@ -2039,7 +2460,8 @@ static pj_bool_t on_data_sent(pj_ice_strans *ice_st, pj_ssize_t sent)
if (ice_st->destroy_req || !ice_st->is_pending)
return PJ_TRUE;
- if (ice_st->call_send_cb && ice_st->cb.on_data_sent) {
+ if (ice_st->call_send_cb && ice_st->cb.on_data_sent
+ && sent == ice_st->last_data_len /* Only app data should be announced */) {
(*ice_st->cb.on_data_sent)(ice_st, sent);
}
@@ -2196,7 +2618,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
{
/* We get an IPv4 mapped address for our IPv6
* host address.
@ -2308,7 +2336,7 @@ index 3cb350c2a..40ae403d0 100644
comp->ipv4_mapped = PJ_TRUE;
/* Find other host candidates with the same (IPv6)
@@ -2208,7 +2622,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
@@ -2208,7 +2630,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
if (comp->cand_list[i].type != PJ_ICE_CAND_TYPE_HOST)
continue;
@ -2317,7 +2345,7 @@ index 3cb350c2a..40ae403d0 100644
a1 = &comp->cand_list[i].addr;
a2 = &cand->base_addr;
if (pj_memcmp(pj_sockaddr_get_addr(a1),
@@ -2225,7 +2639,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
@@ -2225,7 +2647,7 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
pj_sockaddr_cp(&cand->base_addr, &info.mapped_addr);
pj_sockaddr_cp(&cand->rel_addr, &info.mapped_addr);
}
@ -2326,7 +2354,7 @@ index 3cb350c2a..40ae403d0 100644
/* Eliminate the srflx candidate if the address is
* equal to other (host) candidates.
*/
@@ -2268,11 +2682,11 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
@@ -2268,11 +2690,11 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
sizeof(ipaddr), 3)));
sess_init_update(ice_st);
@ -2340,7 +2368,7 @@ index 3cb350c2a..40ae403d0 100644
PJ_ICE_STRANS_OP_ADDR_CHANGE,
status);
}
@@ -2318,6 +2732,10 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
@@ -2318,6 +2740,10 @@ static pj_bool_t stun_on_status(pj_stun_sock *stun_sock,
}
}
break;
@ -2351,7 +2379,7 @@ index 3cb350c2a..40ae403d0 100644
}
return pj_grp_lock_dec_ref(ice_st->grp_lock)? PJ_FALSE : PJ_TRUE;
@@ -2358,14 +2776,103 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock,
@@ -2358,14 +2784,103 @@ static void turn_on_rx_data(pj_turn_sock *turn_sock,
} else {
/* Hand over the packet to ICE */
@ -4128,3 +4156,5 @@ index 474a8d07c..9257f07a4 100644
NULL, sess, &sess->stun_sock);
if (status != PJ_SUCCESS) {
char errmsg[PJ_ERR_MSG_SIZE];
--
2.25.2

View File

@ -185,7 +185,7 @@ public:
bool handleEvents(unsigned max_msec);
// Wait data on components
pj_ssize_t lastReadLen_;
pj_ssize_t lastSentLen_;
std::condition_variable waitDataCv_ = {};
onShutdownCb scb;
@ -340,7 +340,8 @@ IceTransport::Impl::Impl(const char* name, int component_count, bool master,
icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) {
if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
tr->lastReadLen_ = size;
std::lock_guard<std::mutex> lk(tr->iceMutex_);
tr->lastSentLen_ += size;
tr->waitDataCv_.notify_all();
} else
JAMI_WARN("null IceTransport");
@ -1309,17 +1310,15 @@ IceTransport::send(int comp_id, const unsigned char* buf, size_t len)
errno = EINVAL;
return -1;
}
pj_ssize_t sent_size = 0;
auto status = pj_ice_strans_sendto2(pimpl_->icest_.get(), comp_id+1, buf, len, remote.pjPtr(), remote.getLength());
if (status == PJ_EPENDING && isTCPEnabled()) {
auto current_size = sent_size;
// NOTE; because we are in TCP, the sent size will count the header (2
// bytes length).
while (current_size < static_cast<pj_ssize_t>(len)) {
std::unique_lock<std::mutex> lk(pimpl_->iceMutex_);
pimpl_->waitDataCv_.wait(lk);
current_size = pimpl_->lastReadLen_;
}
std::unique_lock<std::mutex> lk(pimpl_->iceMutex_);
pimpl_->waitDataCv_.wait(lk, [&]{
return pimpl_->lastSentLen_ >= len;
});
pimpl_->lastSentLen_ = 0;
} else if (status != PJ_SUCCESS && status != PJ_EPENDING) {
if (status == PJ_EBUSY) {
errno = EAGAIN;