Ignore:
Timestamp:
Mar 11, 2016 4:17:32 AM (9 years ago)
Author:
nanang
Message:

Re #1900:

  • Works on UWP socket & ioqueue.
  • Media transport UDP: cancel any pending send on detach, otherwise there is possibility that send buffer is already freed by application (stream) when the send op starts.
  • Ioqueue common abs: rename 'generic' as it seems to be a keyword in C++/CX, fixed #if/#endif possition in ioqueue_init_key().
  • pjsua GUI app: fixed thread registration status check.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/branches/projects/uwp/pjlib/src/pj/sock_uwp.cpp

    r5254 r5256  
    1717 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  
    1818 */ 
    19 #include <pj/sock.h> 
    20 #include <pj/addr_resolv.h> 
    2119#include <pj/assert.h> 
    2220#include <pj/errno.h> 
    2321#include <pj/math.h> 
    2422#include <pj/os.h> 
    25 #include <pj/string.h> 
    26 #include <pj/unicode.h> 
    2723#include <pj/compat/socket.h> 
    2824 
     
    3026#include <string> 
    3127 
     28#define THIS_FILE       "sock_uwp.cpp" 
     29 
    3230#include "sock_uwp.h" 
    33  
    34 #define THIS_FILE       "sock_uwp.cpp" 
    3531 
    3632 /* 
     
    201197    { 
    202198        try { 
     199            if (uwp_sock->sock_state >= SOCKSTATE_DISCONNECTED) 
     200                return; 
     201 
    203202            recv_args = args; 
    204203            avail_data_len = args->GetDataReader()->UnconsumedBufferLength; 
    205204 
    206             // Notify application asynchronously 
    207             concurrency::create_task([this]() 
    208             { 
    209                 if (uwp_sock->on_read) { 
    210                     if (!pj_thread_is_registered()) 
    211                         pj_thread_register("MsgReceive", thread_desc,  
    212                                            &rec_thread); 
    213  
    214                     (tp)(*uwp_sock->read_userdata) 
    215                     (*uwp_sock->on_read)(uwp_sock, avail_data_len); 
    216                 } 
    217             }); 
     205            if (uwp_sock->cb.on_read) { 
     206                (*uwp_sock->cb.on_read)(uwp_sock, avail_data_len); 
     207            } 
    218208 
    219209            WaitForSingleObjectEx(recv_wait, INFINITE, false); 
    220         } catch (Exception^ e) {} 
     210        } catch (...) {} 
    221211    } 
    222212 
     
    265255    HANDLE recv_wait; 
    266256    int avail_data_len; 
    267     pj_thread_desc thread_desc; 
    268     pj_thread_t *rec_thread; 
    269257}; 
    270258 
     
    288276            conn_args = args; 
    289277 
    290             // Notify application asynchronously 
    291             concurrency::create_task([this]() 
    292             { 
    293                 if (uwp_sock->on_accept) { 
    294                     if (!pj_thread_is_registered()) 
    295                         pj_thread_register("ConnReceive", thread_desc,  
    296                                            &listener_thread); 
    297  
    298                     (*uwp_sock->on_accept)(uwp_sock, PJ_SUCCESS); 
    299                 } 
    300             }); 
     278            if (uwp_sock->cb.on_accept) { 
     279                (*uwp_sock->cb.on_accept)(uwp_sock); 
     280            } 
    301281 
    302282            WaitForSingleObjectEx(conn_wait, INFINITE, false); 
     
    304284    } 
    305285 
    306     pj_status_t GetAcceptedSocket(StreamSocket^ stream_sock) 
     286    pj_status_t GetAcceptedSocket(StreamSocket^& stream_sock) 
    307287    { 
    308288        if (conn_args == nullptr) 
     
    333313    EventRegistrationToken event_token; 
    334314    HANDLE conn_wait; 
    335  
    336     pj_thread_desc thread_desc; 
    337     pj_thread_t *listener_thread; 
    338315}; 
    339316 
     
    341318PjUwpSocket::PjUwpSocket(int af_, int type_, int proto_) : 
    342319    af(af_), type(type_), proto(proto_), 
    343     sock_type(SOCKTYPE_UNKNOWN), sock_state(SOCKSTATE_NULL), 
    344     is_blocking(PJ_TRUE), is_busy_sending(PJ_FALSE) 
     320    sock_type(SOCKTYPE_UNKNOWN), 
     321    sock_state(SOCKSTATE_NULL), 
     322    is_blocking(PJ_TRUE), 
     323    has_pending_bind(PJ_FALSE), 
     324    has_pending_send(PJ_FALSE), 
     325    has_pending_recv(PJ_FALSE) 
    345326{ 
    346327    pj_sockaddr_init(pj_AF_INET(), &local_addr, NULL, 0); 
     
    349330 
    350331PjUwpSocket::~PjUwpSocket() 
    351 {} 
     332{ 
     333    DeinitSocket(); 
     334} 
    352335 
    353336PjUwpSocket* PjUwpSocket::CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_) 
     
    359342    new_sock->socket_reader = ref new DataReader(new_sock->stream_sock->InputStream); 
    360343    new_sock->socket_writer = ref new DataWriter(new_sock->stream_sock->OutputStream); 
     344    new_sock->socket_reader->InputStreamOptions = InputStreamOptions::Partial; 
    361345    new_sock->send_buffer = ref new Buffer(SEND_BUFFER_SIZE); 
    362346    new_sock->is_blocking = is_blocking; 
     
    399383} 
    400384 
     385 
     386void PjUwpSocket::DeinitSocket() 
     387{ 
     388    if (stream_sock) { 
     389        concurrency::create_task(stream_sock->CancelIOAsync()).wait(); 
     390    } 
     391    if (datagram_sock) { 
     392        concurrency::create_task(datagram_sock->CancelIOAsync()).wait(); 
     393    } 
     394    if (listener_sock) { 
     395        concurrency::create_task(listener_sock->CancelIOAsync()).wait(); 
     396    } 
     397    stream_sock = nullptr; 
     398    datagram_sock = nullptr; 
     399    dgram_recv_helper = nullptr; 
     400    listener_sock = nullptr; 
     401    listener_helper = nullptr; 
     402    socket_writer = nullptr; 
     403    socket_reader = nullptr; 
     404    sock_state = SOCKSTATE_NULL; 
     405} 
     406 
     407pj_status_t PjUwpSocket::Bind(const pj_sockaddr_t *addr) 
     408{ 
     409    /* Not initialized yet, socket type is perhaps TCP, just not decided yet 
     410     * whether it is a stream or a listener. 
     411     */ 
     412    if (sock_state < SOCKSTATE_INITIALIZED) { 
     413        pj_sockaddr_cp(&local_addr, addr); 
     414        has_pending_bind = PJ_TRUE; 
     415        return PJ_SUCCESS; 
     416    } 
     417     
     418    PJ_ASSERT_RETURN(sock_state == SOCKSTATE_INITIALIZED, PJ_EINVALIDOP); 
     419    if (sock_type != SOCKTYPE_DATAGRAM && sock_type != SOCKTYPE_LISTENER) 
     420        return PJ_EINVALIDOP; 
     421 
     422    if (has_pending_bind) { 
     423        has_pending_bind = PJ_FALSE; 
     424        if (!addr) 
     425            addr = &local_addr; 
     426    } 
     427 
     428    /* If no bound address is set, just return */ 
     429    if (!pj_sockaddr_has_addr(addr) && !pj_sockaddr_get_port(addr)) 
     430        return PJ_SUCCESS; 
     431 
     432    if (addr != &local_addr) 
     433        pj_sockaddr_cp(&local_addr, addr); 
     434 
     435    HRESULT err = 0; 
     436    try { 
     437        concurrency::create_task([this, addr]() { 
     438            HostName ^host; 
     439            int port; 
     440            sockaddr_to_hostname_port(addr, host, &port); 
     441            if (pj_sockaddr_has_addr(addr)) { 
     442                if (sock_type == SOCKTYPE_DATAGRAM) 
     443                    return datagram_sock->BindEndpointAsync(host, port.ToString()); 
     444                else 
     445                    return listener_sock->BindEndpointAsync(host, port.ToString()); 
     446            } else /* if (pj_sockaddr_get_port(addr) != 0) */ { 
     447                if (sock_type == SOCKTYPE_DATAGRAM) 
     448                    return datagram_sock->BindServiceNameAsync(port.ToString()); 
     449                else 
     450                    return listener_sock->BindServiceNameAsync(port.ToString()); 
     451            } 
     452        }).then([this, &err](concurrency::task<void> t) 
     453        { 
     454            try { 
     455                t.get(); 
     456            } catch (Exception^ e) { 
     457                err = e->HResult; 
     458            } 
     459        }).get(); 
     460    } catch (Exception^ e) { 
     461        err = e->HResult; 
     462    } 
     463 
     464    return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 
     465} 
     466 
     467 
     468pj_status_t PjUwpSocket::SendImp(const void *buf, pj_ssize_t *len) 
     469{ 
     470    if (has_pending_send) 
     471        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     472 
     473    if (*len > (pj_ssize_t)send_buffer->Capacity) 
     474        return PJ_ETOOBIG; 
     475 
     476    CopyToIBuffer((unsigned char*)buf, *len, send_buffer); 
     477    send_buffer->Length = *len; 
     478    socket_writer->WriteBuffer(send_buffer); 
     479 
     480    /* Blocking version */ 
     481    if (is_blocking) { 
     482        pj_status_t status = PJ_SUCCESS; 
     483        concurrency::cancellation_token_source cts; 
     484        auto cts_token = cts.get_token(); 
     485        auto t = concurrency::create_task(socket_writer->StoreAsync(), 
     486                                          cts_token); 
     487        *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 
     488            then([cts_token, &status](concurrency::task<unsigned int> t_) 
     489        { 
     490            int sent = 0; 
     491            try { 
     492                if (cts_token.is_canceled()) 
     493                    status = PJ_ETIMEDOUT; 
     494                else 
     495                    sent = t_.get(); 
     496            } catch (Exception^ e) { 
     497                status = PJ_RETURN_OS_ERROR(e->HResult); 
     498            } 
     499            return sent; 
     500        }).get(); 
     501 
     502        return status; 
     503    }  
     504 
     505    /* Non-blocking version */ 
     506    has_pending_send = PJ_TRUE; 
     507    concurrency::create_task(socket_writer->StoreAsync()). 
     508        then([this](concurrency::task<unsigned int> t_) 
     509    { 
     510        try { 
     511            unsigned int l = t_.get(); 
     512            has_pending_send = PJ_FALSE; 
     513 
     514            // invoke callback 
     515            if (cb.on_write) { 
     516                (*cb.on_write)(this, l); 
     517            } 
     518        } catch (...) { 
     519            has_pending_send = PJ_FALSE; 
     520            sock_state = SOCKSTATE_ERROR; 
     521            DeinitSocket(); 
     522 
     523            // invoke callback 
     524            if (cb.on_write) { 
     525                (*cb.on_write)(this, -PJ_EUNKNOWN); 
     526            } 
     527        } 
     528    }); 
     529 
     530    return PJ_SUCCESS; 
     531} 
     532 
     533 
     534pj_status_t PjUwpSocket::Send(const void *buf, pj_ssize_t *len) 
     535{ 
     536    if ((sock_type!=SOCKTYPE_STREAM && sock_type!=SOCKTYPE_DATAGRAM) || 
     537        (sock_state!=SOCKSTATE_CONNECTED)) 
     538    { 
     539        return PJ_EINVALIDOP; 
     540    } 
     541 
     542    /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ 
     543    if (sock_type == SOCKTYPE_DATAGRAM) { 
     544        return SendTo(buf, len, &remote_addr); 
     545    } 
     546 
     547    return SendImp(buf, len); 
     548} 
     549 
     550 
     551pj_status_t PjUwpSocket::SendTo(const void *buf, pj_ssize_t *len, 
     552                                const pj_sockaddr_t *to) 
     553{ 
     554    if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED 
     555        || sock_state >= SOCKSTATE_DISCONNECTED) 
     556    { 
     557        return PJ_EINVALIDOP; 
     558    } 
     559 
     560    if (has_pending_send) 
     561        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     562 
     563    if (*len > (pj_ssize_t)send_buffer->Capacity) 
     564        return PJ_ETOOBIG; 
     565 
     566    HostName ^hostname; 
     567    int port; 
     568    sockaddr_to_hostname_port(to, hostname, &port); 
     569 
     570    concurrency::cancellation_token_source cts; 
     571    auto cts_token = cts.get_token(); 
     572    auto t = concurrency::create_task(datagram_sock->GetOutputStreamAsync( 
     573                                      hostname, port.ToString()), cts_token); 
     574    pj_status_t status = PJ_SUCCESS; 
     575 
     576    cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 
     577        then([this, cts_token, &status](concurrency::task<IOutputStream^> t_) 
     578    { 
     579        try { 
     580            if (cts_token.is_canceled()) { 
     581                status = PJ_ETIMEDOUT; 
     582            } else { 
     583                IOutputStream^ outstream = t_.get(); 
     584                socket_writer = ref new DataWriter(outstream); 
     585            } 
     586        } catch (Exception^ e) { 
     587            status = PJ_RETURN_OS_ERROR(e->HResult); 
     588        } 
     589    }).get(); 
     590 
     591    if (status != PJ_SUCCESS) 
     592        return status; 
     593 
     594    status = SendImp(buf, len); 
     595    if ((status == PJ_SUCCESS || status == PJ_EPENDING) && 
     596        sock_state < SOCKSTATE_CONNECTED) 
     597    { 
     598        sock_state = SOCKSTATE_CONNECTED; 
     599    } 
     600 
     601    return status; 
     602} 
     603 
     604 
     605int PjUwpSocket::ConsumeReadBuffer(void *buf, int max_len) 
     606{ 
     607    if (socket_reader->UnconsumedBufferLength == 0) 
     608        return 0; 
     609 
     610    int read_len = PJ_MIN((int)socket_reader->UnconsumedBufferLength,max_len); 
     611    IBuffer^ buffer = socket_reader->ReadBuffer(read_len); 
     612    read_len = buffer->Length; 
     613    CopyFromIBuffer((unsigned char*)buf, read_len, buffer); 
     614    return read_len; 
     615} 
     616 
     617 
     618pj_status_t PjUwpSocket::Recv(void *buf, pj_ssize_t *len) 
     619{ 
     620    /* Only for TCP, at least for now! */ 
     621    if (sock_type == SOCKTYPE_DATAGRAM) 
     622        return PJ_ENOTSUP; 
     623 
     624    if (sock_type != SOCKTYPE_STREAM || sock_state != SOCKSTATE_CONNECTED) 
     625        return PJ_EINVALIDOP; 
     626 
     627    if (has_pending_recv) 
     628        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     629 
     630    /* First check if there is already some data in the read buffer */ 
     631    if (buf) { 
     632        int avail_len = ConsumeReadBuffer(buf, *len); 
     633        if (avail_len > 0) { 
     634            *len = avail_len; 
     635            return PJ_SUCCESS; 
     636        } 
     637    } 
     638 
     639    /* Blocking version */ 
     640    if (is_blocking) { 
     641        pj_status_t status = PJ_SUCCESS; 
     642        concurrency::cancellation_token_source cts; 
     643        auto cts_token = cts.get_token(); 
     644        auto t = concurrency::create_task(socket_reader->LoadAsync(*len), 
     645                                            cts_token); 
     646        *len = cancel_after_timeout(t, cts, READ_TIMEOUT) 
     647            .then([this, len, buf, cts_token, &status] 
     648                                    (concurrency::task<unsigned int> t_) 
     649        { 
     650            try { 
     651                if (cts_token.is_canceled()) { 
     652                    status = PJ_ETIMEDOUT; 
     653                    return 0; 
     654                } 
     655                t_.get(); 
     656            } catch (Exception^) { 
     657                status = PJ_ETIMEDOUT; 
     658                return 0; 
     659            } 
     660 
     661            *len = ConsumeReadBuffer(buf, *len); 
     662            return (int)*len; 
     663        }).get(); 
     664 
     665        return status; 
     666    } 
     667 
     668    /* Non-blocking version */ 
     669 
     670    has_pending_recv = PJ_TRUE; 
     671    concurrency::create_task(socket_reader->LoadAsync(*len)) 
     672        .then([this](concurrency::task<unsigned int> t_) 
     673    { 
     674        try { 
     675            // catch any exception 
     676            t_.get(); 
     677            has_pending_recv = PJ_FALSE; 
     678 
     679            // invoke callback 
     680            int read_len = socket_reader->UnconsumedBufferLength; 
     681            if (read_len > 0 && cb.on_read) { 
     682                (*cb.on_read)(this, read_len); 
     683            } 
     684        } catch (Exception^ e) { 
     685            has_pending_recv = PJ_FALSE; 
     686 
     687            // invoke callback 
     688            if (cb.on_read) { 
     689                (*cb.on_read)(this, -PJ_EUNKNOWN); 
     690            } 
     691        } 
     692    }); 
     693 
     694    return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     695} 
     696 
     697 
     698pj_status_t PjUwpSocket::RecvFrom(void *buf, pj_ssize_t *len, 
     699                                  pj_sockaddr_t *from) 
     700{ 
     701    if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED 
     702        || sock_state >= SOCKSTATE_DISCONNECTED) 
     703    { 
     704        return PJ_EINVALIDOP; 
     705    } 
     706 
     707    /* Start receive, if not yet */ 
     708    if (dgram_recv_helper == nullptr) { 
     709        dgram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(this); 
     710    } 
     711 
     712    /* Try to read any available data first */ 
     713    if (buf || is_blocking) { 
     714        pj_status_t status; 
     715        status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); 
     716        if (status != PJ_ENOTFOUND) 
     717            return status; 
     718    } 
     719 
     720    /* Blocking version */ 
     721    if (is_blocking) { 
     722        int max_loop = 0; 
     723        pj_status_t status = PJ_ENOTFOUND; 
     724        while (status == PJ_ENOTFOUND && sock_state <= SOCKSTATE_CONNECTED) 
     725        { 
     726            status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); 
     727            if (status != PJ_SUCCESS) 
     728                pj_thread_sleep(100); 
     729 
     730            if (++max_loop > 10) 
     731                return PJ_ETIMEDOUT; 
     732        } 
     733        return status; 
     734    } 
     735 
     736    /* For non-blocking version, just return PJ_EPENDING */ 
     737    return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     738} 
     739 
     740 
     741pj_status_t PjUwpSocket::Connect(const pj_sockaddr_t *addr) 
     742{ 
     743    pj_status_t status; 
     744 
     745    PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN && sock_state == SOCKSTATE_NULL) || 
     746                     (sock_type == SOCKTYPE_DATAGRAM && sock_state == SOCKSTATE_INITIALIZED), 
     747                     PJ_EINVALIDOP); 
     748 
     749    if (sock_type == SOCKTYPE_UNKNOWN) { 
     750        InitSocket(SOCKTYPE_STREAM); 
     751        // No need to check pending bind, no bind for TCP client socket 
     752    } 
     753 
     754    pj_sockaddr_cp(&remote_addr, addr); 
     755 
     756    auto t = concurrency::create_task([this, addr]() 
     757    { 
     758        HostName ^hostname; 
     759        int port; 
     760        sockaddr_to_hostname_port(&remote_addr, hostname, &port); 
     761        if (sock_type == SOCKTYPE_STREAM) 
     762            return stream_sock->ConnectAsync(hostname, port.ToString(), 
     763                                      SocketProtectionLevel::PlainSocket); 
     764        else 
     765            return datagram_sock->ConnectAsync(hostname, port.ToString()); 
     766    }).then([=](concurrency::task<void> t_) 
     767    { 
     768        try { 
     769            t_.get(); 
     770 
     771            sock_state = SOCKSTATE_CONNECTED; 
     772 
     773            // Update local & remote address 
     774            HostName^ local_address; 
     775            String^ local_port; 
     776 
     777            if (sock_type == SOCKTYPE_STREAM) { 
     778                local_address = stream_sock->Information->LocalAddress; 
     779                local_port = stream_sock->Information->LocalPort; 
     780 
     781                socket_reader = ref new DataReader(stream_sock->InputStream); 
     782                socket_writer = ref new DataWriter(stream_sock->OutputStream); 
     783                socket_reader->InputStreamOptions = InputStreamOptions::Partial; 
     784            } else { 
     785                local_address = datagram_sock->Information->LocalAddress; 
     786                local_port = datagram_sock->Information->LocalPort; 
     787            } 
     788            if (local_address && local_port) { 
     789                wstr_addr_to_sockaddr(local_address->CanonicalName->Data(), 
     790                    local_port->Data(), 
     791                    &local_addr); 
     792            } 
     793 
     794            if (!is_blocking && cb.on_connect) { 
     795                (*cb.on_connect)(this, PJ_SUCCESS); 
     796            } 
     797            return (pj_status_t)PJ_SUCCESS; 
     798 
     799        } catch (Exception^ ex) { 
     800 
     801            SocketErrorStatus status = SocketError::GetStatus(ex->HResult); 
     802 
     803            switch (status) 
     804            { 
     805            case SocketErrorStatus::UnreachableHost: 
     806                break; 
     807            case SocketErrorStatus::ConnectionTimedOut: 
     808                break; 
     809            case SocketErrorStatus::ConnectionRefused: 
     810                break; 
     811            default: 
     812                break; 
     813            } 
     814 
     815            if (!is_blocking && cb.on_connect) { 
     816                (*cb.on_connect)(this, PJ_EUNKNOWN); 
     817            } 
     818 
     819            return (pj_status_t)PJ_EUNKNOWN; 
     820        } 
     821    }); 
     822 
     823    if (!is_blocking) 
     824        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_CONNECT_ERROR_VAL); 
     825 
     826    try { 
     827        status = t.get(); 
     828    } catch (Exception^) { 
     829        return PJ_EUNKNOWN; 
     830    } 
     831    return status; 
     832} 
     833 
     834pj_status_t PjUwpSocket::Listen() 
     835{ 
     836    PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN) || 
     837                     (sock_type == SOCKTYPE_LISTENER && 
     838                      sock_state == SOCKSTATE_INITIALIZED), 
     839                     PJ_EINVALIDOP); 
     840 
     841    if (sock_type == SOCKTYPE_UNKNOWN) 
     842        InitSocket(SOCKTYPE_LISTENER); 
     843 
     844    if (has_pending_bind) 
     845        Bind(); 
     846 
     847    /* Start listen */ 
     848    if (listener_helper == nullptr) { 
     849        listener_helper = ref new PjUwpSocketListenerHelper(this); 
     850    } 
     851 
     852    return PJ_SUCCESS; 
     853} 
     854 
     855pj_status_t PjUwpSocket::Accept(PjUwpSocket **new_sock) 
     856{ 
     857    if (sock_type != SOCKTYPE_LISTENER || sock_state != SOCKSTATE_INITIALIZED) 
     858        return PJ_EINVALIDOP; 
     859 
     860    StreamSocket^ accepted_sock; 
     861    pj_status_t status = listener_helper->GetAcceptedSocket(accepted_sock); 
     862    if (status == PJ_ENOTFOUND) 
     863        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     864 
     865    if (status != PJ_SUCCESS) 
     866        return status; 
     867 
     868    *new_sock = CreateAcceptSocket(accepted_sock); 
     869    return PJ_SUCCESS; 
     870} 
    401871 
    402872 
     
    7181188    PJ_ASSERT_RETURN(sock, PJ_EINVAL); 
    7191189    PJ_ASSERT_RETURN(addr && len>=(int)sizeof(pj_sockaddr_in), PJ_EINVAL); 
    720  
    7211190    PjUwpSocket *s = (PjUwpSocket*)sock; 
    722  
    723     if (s->sock_state > SOCKSTATE_INITIALIZED) 
    724         return PJ_EINVALIDOP; 
    725  
    726     pj_sockaddr_cp(&s->local_addr, addr); 
    727  
    728     /* Bind now if this is UDP. But if it is TCP, unfortunately we don't 
    729      * know yet whether it is SocketStream or Listener! 
    730      */ 
    731     if (s->type == pj_SOCK_DGRAM()) { 
    732         HRESULT err = 0; 
    733  
    734         try { 
    735             concurrency::create_task([s, addr]() { 
    736                 HostName ^hostname; 
    737                 int port; 
    738                 sockaddr_to_hostname_port(addr, hostname, &port); 
    739                 if (pj_sockaddr_has_addr(addr)) { 
    740                     s->datagram_sock->BindEndpointAsync(hostname,  
    741                                                         port.ToString()); 
    742                 } else if (pj_sockaddr_get_port(addr) != 0) { 
    743                     s->datagram_sock->BindServiceNameAsync(port.ToString()); 
    744                 } 
    745             }).then([s, &err](concurrency::task<void> t) 
    746             { 
    747                 try { 
    748                     t.get(); 
    749                     s->sock_state = SOCKSTATE_CONNECTED; 
    750                 } catch (Exception^ e) { 
    751                     err = e->HResult; 
    752                 } 
    753             }).get(); 
    754         } catch (Exception^ e) { 
    755             err = e->HResult; 
    756         } 
    757  
    758         return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 
    759     } 
    760  
    761     return PJ_SUCCESS; 
     1191    return s->Bind(addr); 
    7621192} 
    7631193 
     
    8121242 
    8131243    PjUwpSocket *s = (PjUwpSocket*)sock; 
    814  
    815     pj_sockaddr_cp(addr, &s->remote_addr); 
    816     *namelen = pj_sockaddr_get_len(&s->remote_addr); 
     1244    pj_sockaddr_cp(addr, s->GetRemoteAddr()); 
     1245    *namelen = pj_sockaddr_get_len(addr); 
    8171246 
    8181247    return PJ_SUCCESS; 
     
    8311260 
    8321261    PjUwpSocket *s = (PjUwpSocket*)sock; 
    833  
    834     pj_sockaddr_cp(addr, &s->local_addr); 
    835     *namelen = pj_sockaddr_get_len(&s->local_addr); 
     1262    pj_sockaddr_cp(addr, s->GetLocalAddr()); 
     1263    *namelen = pj_sockaddr_get_len(addr); 
    8361264 
    8371265    return PJ_SUCCESS; 
    8381266} 
    8391267 
    840  
    841 static pj_status_t sock_send_imp(PjUwpSocket *s, const void *buf, 
    842                                  pj_ssize_t *len) 
    843 { 
    844     if (s->is_busy_sending) 
    845         return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK); 
    846  
    847     if (*len > (pj_ssize_t)s->send_buffer->Capacity) 
    848         return PJ_ETOOBIG; 
    849  
    850     CopyToIBuffer((unsigned char*)buf, *len, s->send_buffer); 
    851     s->send_buffer->Length = *len; 
    852     s->socket_writer->WriteBuffer(s->send_buffer); 
    853  
    854     if (s->is_blocking) { 
    855         pj_status_t status = PJ_SUCCESS; 
    856         concurrency::cancellation_token_source cts; 
    857         auto cts_token = cts.get_token(); 
    858         auto t = concurrency::create_task(s->socket_writer->StoreAsync(), 
    859                                           cts_token); 
    860         *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 
    861             then([cts_token, &status](concurrency::task<unsigned int> t_) 
    862         { 
    863             int sent = 0; 
    864             try { 
    865                 if (cts_token.is_canceled()) 
    866                     status = PJ_ETIMEDOUT; 
    867                 else 
    868                     sent = t_.get(); 
    869             } catch (Exception^ e) { 
    870                 status = PJ_RETURN_OS_ERROR(e->HResult); 
    871             } 
    872             return sent; 
    873         }).get(); 
    874  
    875         return status; 
    876     }  
    877  
    878     s->is_busy_sending = true; 
    879     concurrency::create_task(s->socket_writer->StoreAsync()). 
    880         then([s](concurrency::task<unsigned int> t_) 
    881     { 
    882         try { 
    883             unsigned int l = t_.get(); 
    884             s->is_busy_sending = false; 
    885  
    886             // invoke callback 
    887             if (s->on_write) { 
    888                 (*s->on_write)(s, l); 
    889             } 
    890         } catch (Exception^ e) { 
    891             s->is_busy_sending = false; 
    892             if (s->sock_type == SOCKTYPE_STREAM) 
    893                 s->sock_state = SOCKSTATE_DISCONNECTED; 
    894  
    895             // invoke callback 
    896             if (s->on_write) { 
    897                 (*s->on_write)(s, -PJ_RETURN_OS_ERROR(e->HResult)); 
    898             } 
    899         } 
    900     }); 
    901  
    902     return PJ_EPENDING; 
    903 } 
    9041268 
    9051269/* 
     
    9161280 
    9171281    PjUwpSocket *s = (PjUwpSocket*)sock; 
    918  
    919     if ((s->sock_type!=SOCKTYPE_STREAM && s->sock_type!=SOCKTYPE_DATAGRAM) || 
    920         (s->sock_state!=SOCKSTATE_CONNECTED)) 
    921     { 
    922         return PJ_EINVALIDOP; 
    923     } 
    924   
    925     /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ 
    926     if (s->sock_type == SOCKTYPE_DATAGRAM) { 
    927         return pj_sock_sendto(sock, buf, len, flags, &s->remote_addr, 
    928                               pj_sockaddr_get_len(&s->remote_addr)); 
    929     } 
    930  
    931     return sock_send_imp(s, buf, len); 
     1282    return s->Send(buf, len); 
    9321283} 
    9331284 
     
    9491300     
    9501301    PjUwpSocket *s = (PjUwpSocket*)sock; 
    951  
    952     if (s->sock_type != SOCKTYPE_DATAGRAM || 
    953         s->sock_state < SOCKSTATE_INITIALIZED) 
    954     { 
    955         return PJ_EINVALIDOP; 
    956     } 
    957  
    958     if (s->is_busy_sending) 
    959         return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK); 
    960  
    961     if (*len > (pj_ssize_t)s->send_buffer->Capacity) 
    962         return PJ_ETOOBIG; 
    963  
    964     HostName ^hostname; 
    965     int port; 
    966     sockaddr_to_hostname_port(to, hostname, &port); 
    967  
    968     concurrency::cancellation_token_source cts; 
    969     auto cts_token = cts.get_token(); 
    970     auto t = concurrency::create_task( 
    971                 s->datagram_sock->GetOutputStreamAsync( 
    972                     hostname, port.ToString()), cts_token); 
    973     pj_status_t status = PJ_SUCCESS; 
    974  
    975     cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 
    976         then([s, cts_token, &status](concurrency::task<IOutputStream^> t_) 
    977     { 
    978         try { 
    979             if (cts_token.is_canceled()) { 
    980                 status = PJ_ETIMEDOUT; 
    981             } else { 
    982                 IOutputStream^ outstream = t_.get(); 
    983                 s->socket_writer = ref new DataWriter(outstream); 
    984             } 
    985         } catch (Exception^ e) { 
    986             status = PJ_RETURN_OS_ERROR(e->HResult); 
    987         } 
    988     }).get(); 
    989  
    990     if (status != PJ_SUCCESS) 
    991         return status; 
    992  
    993     status = sock_send_imp(s, buf, len); 
    994  
    995     if ((status == PJ_SUCCESS || status == PJ_EPENDING) && 
    996         s->sock_state < SOCKSTATE_CONNECTED) 
    997     { 
    998         s->sock_state = SOCKSTATE_CONNECTED; 
    999     } 
    1000  
    1001     return status; 
    1002 } 
    1003  
    1004  
    1005 static int consume_read_buffer(PjUwpSocket *s, void *buf, int max_len) 
    1006 { 
    1007     if (s->socket_reader->UnconsumedBufferLength == 0) 
    1008         return 0; 
    1009  
    1010     int read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, 
    1011                           max_len); 
    1012     IBuffer^ buffer = s->socket_reader->ReadBuffer(read_len); 
    1013     read_len = buffer->Length; 
    1014     CopyFromIBuffer((unsigned char*)buf, read_len, buffer); 
    1015     return read_len; 
     1302    return s->SendTo(buf, len, to); 
    10161303} 
    10171304 
     
    10311318 
    10321319    PjUwpSocket *s = (PjUwpSocket*)sock; 
    1033  
    1034     /* Only for TCP, at least for now! */ 
    1035     if (s->sock_type == SOCKTYPE_DATAGRAM) 
    1036         return PJ_ENOTSUP; 
    1037  
    1038     if (s->sock_type != SOCKTYPE_STREAM || 
    1039         s->sock_state != SOCKSTATE_CONNECTED) 
    1040     { 
    1041         return PJ_EINVALIDOP; 
    1042     } 
    1043  
    1044     /* First check if there is already some data in the read buffer */ 
    1045     int avail_len = consume_read_buffer(s, buf, *len); 
    1046     if (avail_len > 0) { 
    1047         *len = avail_len; 
    1048         return PJ_SUCCESS; 
    1049     } 
    1050  
    1051     /* Start sync read */ 
    1052     if (s->is_blocking) { 
    1053         pj_status_t status = PJ_SUCCESS; 
    1054         concurrency::cancellation_token_source cts; 
    1055         auto cts_token = cts.get_token(); 
    1056         auto t = concurrency::create_task(s->socket_reader->LoadAsync(*len), cts_token); 
    1057         *len = cancel_after_timeout(t, cts, READ_TIMEOUT) 
    1058                     .then([s, len, buf, cts_token, &status](concurrency::task<unsigned int> t_) 
    1059         { 
    1060             try { 
    1061                 if (cts_token.is_canceled()) { 
    1062                     status = PJ_ETIMEDOUT; 
    1063                     return 0; 
    1064                 } 
    1065                 t_.get(); 
    1066             } catch (Exception^) { 
    1067                 status = PJ_ETIMEDOUT; 
    1068                 return 0; 
    1069             } 
    1070  
    1071             *len = consume_read_buffer(s, buf, *len); 
    1072             return (int)*len; 
    1073         }).get(); 
    1074  
    1075         return status; 
    1076     } 
    1077  
    1078     /* Start async read */ 
    1079     int read_len = *len; 
    1080     concurrency::create_task(s->socket_reader->LoadAsync(read_len)) 
    1081         .then([s, &read_len](concurrency::task<unsigned int> t_) 
    1082     { 
    1083         try { 
    1084             // catch any exception 
    1085             t_.get(); 
    1086  
    1087             // invoke callback 
    1088             read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, 
    1089                               read_len); 
    1090             if (read_len > 0 && s->on_read) { 
    1091                 (*s->on_read)(s, read_len); 
    1092             } 
    1093         } catch (Exception^ e) { 
    1094             // invoke callback 
    1095             if (s->on_read) { 
    1096                 (*s->on_read)(s, -PJ_RETURN_OS_ERROR(e->HResult)); 
    1097             } 
    1098             return 0; 
    1099         } 
    1100  
    1101         return (int)read_len; 
    1102     }); 
    1103  
    1104     return PJ_EPENDING; 
     1320    return s->Recv(buf, len); 
    11051321} 
    11061322 
     
    11231339 
    11241340    PjUwpSocket *s = (PjUwpSocket*)sock; 
    1125  
    1126     if (s->sock_type != SOCKTYPE_DATAGRAM || 
    1127         s->sock_state < SOCKSTATE_INITIALIZED) 
    1128     { 
    1129         return PJ_EINVALIDOP; 
    1130     } 
    1131  
    1132     /* Start receive, if not yet */ 
    1133     if (s->datagram_recv_helper == nullptr) { 
    1134         s->datagram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(s); 
    1135     } 
    1136  
    1137     /* Try to read any available data first */ 
    1138     pj_status_t status = s->datagram_recv_helper-> 
    1139                                         ReadDataIfAvailable(buf, len, from); 
    1140     if (status != PJ_ENOTFOUND) 
    1141         return status; 
    1142  
    1143     /* Start sync read */ 
    1144     if (s->is_blocking) { 
    1145         while (status == PJ_ENOTFOUND && s->sock_state <= SOCKSTATE_CONNECTED) 
    1146         { 
    1147             status = s->datagram_recv_helper-> 
    1148                                         ReadDataIfAvailable(buf, len, from); 
    1149             pj_thread_sleep(100); 
    1150         } 
    1151         return PJ_SUCCESS; 
    1152     } 
    1153  
    1154     /* For async read, just return PJ_EPENDING */ 
    1155     return PJ_EPENDING; 
     1341    pj_status_t status = s->RecvFrom(buf, len, from); 
     1342    if (status == PJ_SUCCESS) 
     1343        *fromlen = pj_sockaddr_get_len(from); 
     1344    return status; 
    11561345} 
    11571346 
     
    12221411} 
    12231412 
    1224 static pj_status_t tcp_bind(PjUwpSocket *s) 
    1225 { 
    1226     /* If no bound address is set, just return */ 
    1227     if (!pj_sockaddr_has_addr(&s->local_addr) && 
    1228         pj_sockaddr_get_port(&s->local_addr)==0) 
    1229     { 
    1230         return PJ_SUCCESS; 
    1231     } 
    1232  
    1233     HRESULT err = 0;     
    1234  
    1235     try { 
    1236         concurrency::create_task([s]() { 
    1237             HostName ^hostname; 
    1238             int port; 
    1239             sockaddr_to_hostname_port(&s->local_addr, hostname, &port); 
    1240  
    1241             s->listener_sock->BindEndpointAsync(hostname,  
    1242                                                 port.ToString()); 
    1243         }).then([s, &err](concurrency::task<void> t) 
    1244         { 
    1245             try { 
    1246                 t.get(); 
    1247                 s->sock_state = SOCKSTATE_CONNECTED; 
    1248             } catch (Exception^ e) { 
    1249                 err = e->HResult; 
    1250             } 
    1251         }).get(); 
    1252     } catch (Exception^ e) { 
    1253         err = e->HResult; 
    1254     } 
    1255  
    1256     return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 
    1257 } 
    1258  
    12591413 
    12601414/* 
     
    12671421    PJ_CHECK_STACK(); 
    12681422    PJ_ASSERT_RETURN(sock && addr, PJ_EINVAL); 
    1269  
    12701423    PJ_UNUSED_ARG(namelen); 
    12711424 
    12721425    PjUwpSocket *s = (PjUwpSocket*)sock; 
    1273     pj_status_t status; 
    1274  
    1275     pj_sockaddr_cp(&s->remote_addr, addr); 
    1276  
    1277     /* UDP */ 
    1278  
    1279     if (s->sock_type == SOCKTYPE_DATAGRAM) { 
    1280         if (s->sock_state != SOCKSTATE_INITIALIZED) 
    1281             return PJ_EINVALIDOP; 
    1282          
    1283         HostName ^hostname; 
    1284         int port; 
    1285         sockaddr_to_hostname_port(addr, hostname, &port); 
    1286  
    1287         try { 
    1288             concurrency::create_task(s->datagram_sock->ConnectAsync 
    1289                                                    (hostname, port.ToString())) 
    1290                 .then([s](concurrency::task<void> t_) 
    1291             { 
    1292                 try { 
    1293                     t_.get(); 
    1294                 } catch (Exception^ ex)  
    1295                 { 
    1296                  
    1297                 } 
    1298             }).get(); 
    1299         } catch (Exception^) { 
    1300             return PJ_EUNKNOWN; 
    1301         } 
    1302  
    1303         // Update local & remote address 
    1304         wstr_addr_to_sockaddr(s->datagram_sock->Information->RemoteAddress->CanonicalName->Data(), 
    1305                               s->datagram_sock->Information->RemotePort->Data(), 
    1306                               &s->remote_addr); 
    1307         wstr_addr_to_sockaddr(s->datagram_sock->Information->LocalAddress->CanonicalName->Data(), 
    1308                               s->datagram_sock->Information->LocalPort->Data(), 
    1309                               &s->local_addr); 
    1310  
    1311         s->sock_state = SOCKSTATE_CONNECTED; 
    1312          
    1313         return PJ_SUCCESS; 
    1314     } 
    1315  
    1316     /* TCP */ 
    1317  
    1318     /* Init stream socket now */ 
    1319     s->InitSocket(SOCKTYPE_STREAM); 
    1320  
    1321     pj_sockaddr_cp(&s->remote_addr, addr); 
    1322     wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), 
    1323                           s->stream_sock->Information->LocalPort->Data(), 
    1324                           &s->local_addr); 
    1325  
    1326     /* Perform any pending bind */ 
    1327     status = tcp_bind(s); 
    1328     if (status != PJ_SUCCESS) 
    1329         return status; 
    1330  
    1331     char tmp[PJ_INET6_ADDRSTRLEN]; 
    1332     wchar_t wtmp[PJ_INET6_ADDRSTRLEN]; 
    1333     pj_sockaddr_print(addr, tmp, PJ_INET6_ADDRSTRLEN, 0); 
    1334     pj_ansi_to_unicode(tmp, pj_ansi_strlen(tmp), wtmp, 
    1335                        PJ_INET6_ADDRSTRLEN); 
    1336     auto host = ref new HostName(ref new String(wtmp)); 
    1337     int port = pj_sockaddr_get_port(addr); 
    1338  
    1339     auto t = concurrency::create_task(s->stream_sock->ConnectAsync 
    1340              (host, port.ToString(), SocketProtectionLevel::PlainSocket)) 
    1341              .then([=](concurrency::task<void> t_) 
    1342     { 
    1343         try { 
    1344             t_.get(); 
    1345             s->socket_reader = ref new DataReader(s->stream_sock->InputStream); 
    1346             s->socket_writer = ref new DataWriter(s->stream_sock->OutputStream); 
    1347  
    1348             // Update local & remote address 
    1349             wstr_addr_to_sockaddr(s->stream_sock->Information->RemoteAddress->CanonicalName->Data(), 
    1350                                   s->stream_sock->Information->RemotePort->Data(), 
    1351                                   &s->remote_addr); 
    1352             wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), 
    1353                                   s->stream_sock->Information->LocalPort->Data(), 
    1354                                   &s->local_addr); 
    1355  
    1356             s->sock_state = SOCKSTATE_CONNECTED; 
    1357  
    1358             if (!s->is_blocking && s->on_connect) { 
    1359                 (*s->on_connect)(s, PJ_SUCCESS); 
    1360             } 
    1361             return (pj_status_t)PJ_SUCCESS; 
    1362         } catch (Exception^ ex) { 
    1363             SocketErrorStatus status = SocketError::GetStatus(ex->HResult); 
    1364  
    1365             switch (status) 
    1366             { 
    1367             case SocketErrorStatus::UnreachableHost: 
    1368                 break; 
    1369             case SocketErrorStatus::ConnectionTimedOut: 
    1370                 break; 
    1371             case SocketErrorStatus::ConnectionRefused: 
    1372                 break; 
    1373             default: 
    1374                 break; 
    1375             } 
    1376  
    1377             if (!s->is_blocking && s->on_connect) { 
    1378                 (*s->on_connect)(s, PJ_EUNKNOWN); 
    1379             } 
    1380  
    1381             return (pj_status_t)PJ_EUNKNOWN; 
    1382         } 
    1383     }); 
    1384  
    1385     if (!s->is_blocking) 
    1386         return PJ_EPENDING; 
    1387  
    1388     try { 
    1389         status = t.get(); 
    1390     } catch (Exception^) { 
    1391         return PJ_EUNKNOWN; 
    1392     } 
    1393     return status; 
     1426    return s->Connect(addr); 
    13941427} 
    13951428 
     
    14201453 
    14211454    PjUwpSocket *s = (PjUwpSocket*)sock; 
    1422     pj_status_t status; 
    1423  
    1424     /* Init listener socket now */ 
    1425     s->InitSocket(SOCKTYPE_LISTENER); 
    1426  
    1427     /* Perform any pending bind */ 
    1428     status = tcp_bind(s); 
    1429     if (status != PJ_SUCCESS) 
    1430         return status; 
    1431  
    1432     /* Start listen */ 
    1433     if (s->listener_helper == nullptr) { 
    1434         s->listener_helper = ref new PjUwpSocketListenerHelper(s); 
    1435     } 
    1436  
    1437     return PJ_SUCCESS; 
     1455    return s->Listen(); 
    14381456} 
    14391457 
     
    14461464                                    int *addrlen) 
    14471465{ 
    1448     PJ_CHECK_STACK(); 
    1449  
     1466    pj_status_t status; 
     1467 
     1468    PJ_CHECK_STACK(); 
    14501469    PJ_ASSERT_RETURN(serverfd && newsock, PJ_EINVAL); 
    14511470 
    14521471    PjUwpSocket *s = (PjUwpSocket*)serverfd; 
    1453  
    1454     if (s->sock_type != SOCKTYPE_LISTENER || 
    1455         s->sock_state != SOCKSTATE_INITIALIZED) 
    1456     { 
    1457         return PJ_EINVALIDOP; 
    1458     } 
    1459  
    1460     StreamSocket^ accepted_sock; 
    1461     pj_status_t status = s->listener_helper->GetAcceptedSocket(accepted_sock); 
    1462     if (status == PJ_ENOTFOUND) 
    1463         return PJ_EPENDING; 
    1464  
     1472    PjUwpSocket *new_uwp_sock; 
     1473 
     1474    status = s->Accept(&new_uwp_sock); 
    14651475    if (status != PJ_SUCCESS) 
    14661476        return status; 
    1467  
    1468     PjUwpSocket *new_sock = s->CreateAcceptSocket(accepted_sock); 
    1469  
    1470     pj_sockaddr_cp(addr, &new_sock->remote_addr); 
    1471     *addrlen = pj_sockaddr_get_len(&new_sock->remote_addr); 
    1472     *newsock = (pj_sock_t)new_sock; 
     1477    if (newsock == NULL) 
     1478        return PJ_ENOTFOUND; 
     1479 
     1480    *newsock = (pj_sock_t)new_uwp_sock; 
     1481    pj_sockaddr_cp(addr, new_uwp_sock->GetRemoteAddr()); 
     1482    *addrlen = pj_sockaddr_get_len(addr); 
    14731483 
    14741484    return PJ_SUCCESS; 
Note: See TracChangeset for help on using the changeset viewer.