Ignore:
Timestamp:
Dec 5, 2013 3:03:36 AM (10 years ago)
Author:
bennylp
Message:

Re #1519: fixed threading issues on Python. On Python, only threads created by Python can call Python. This creates problem with calling callback from worker thread. The SIP worker thread can be disabled, but we have other worker threads such as the sound device that cannot be disabled. The solution in this patch is to create small framework to post a job to "main thread" during libHandleEvents(). The main thread is thread that calls libCreate().

Location:
pjproject/branches/projects/pjsua2/pjsip
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • pjproject/branches/projects/pjsua2/pjsip/include/pjsua2/endpoint.hpp

    r4672 r4678  
    2727#include <pjsua2/media.hpp> 
    2828#include <pjsua2/siptypes.hpp> 
     29#include <list> 
    2930 
    3031/** PJSUA2 API is inside pj namespace */ 
     
    184185     */ 
    185186    unsigned            threadCnt; 
     187 
     188    /** 
     189     * When this flag is non-zero, all callbacks that come from thread 
     190     * other than main thread will be posted to the main thread and 
     191     * to be executed by Endpoint::libHandleEvents() function. This 
     192     * includes the logging callback. Note that this will only work if 
     193     * threadCnt is set to zero and Endpoint::libHandleEvents() is 
     194     * performed by main thread. By default, the main thread is set 
     195     * from the thread that invoke Endpoint::libCreate() 
     196     * 
     197     * Default: false 
     198     */ 
     199    bool                mainThreadOnly; 
    186200 
    187201    /** 
     
    635649}; 
    636650 
     651/* This represents posted job */ 
     652struct PendingJob 
     653{ 
     654    /** Perform the job */ 
     655    virtual void execute(bool is_pending) = 0; 
     656 
     657    /** Virtual destructor */ 
     658    virtual ~PendingJob() {} 
     659}; 
     660 
    637661////////////////////////////////////////////////////////////////////////////// 
    638662 
     
    717741     * structure, because polling then will be done by these worker threads 
    718742     * instead. 
     743     * 
     744     * If EpConfig::UaConfig::mainThreadOnly is enabled and this function 
     745     * is called from the main thread (by default the main thread is thread 
     746     * that calls libCreate()), this function will also scan and run any 
     747     * pending jobs in the list. 
    719748     * 
    720749     * @param msec_timeout Maximum time to wait, in miliseconds. 
     
    765794 
    766795    /** 
     796     * Write a log entry. 
     797     * 
     798     * @param e                 The log entry. 
     799     */ 
     800    void utilLogWrite(LogEntry &e); 
     801 
     802    /** 
    767803     * This is a utility function to verify that valid SIP url is given. If the 
    768804     * URL is a valid SIP/SIPS scheme, PJ_SUCCESS will be returned. 
     
    814850     */ 
    815851    void utilTimerCancel(Token prmToken); 
     852 
     853    /** 
     854     * Utility to register a pending job to be executed by main thread. 
     855     * If EpConfig::UaConfig::mainThreadOnly is false, the job will be 
     856     * executed immediately. 
     857     * 
     858     * @param job               The job class. 
     859     */ 
     860    void utilAddPendingJob(PendingJob *job); 
    816861 
    817862    /** 
     
    11221167    { PJ_UNUSED_ARG(prm); } 
    11231168 
    1124  
    11251169private: 
    11261170    static Endpoint             *instance_;     // static instance 
     
    11291173    AudDevManager                audioDevMgr; 
    11301174    CodecInfoVector              codecInfoList; 
     1175 
     1176    /* Pending logging */ 
     1177    bool                         mainThreadOnly; 
     1178    void                        *mainThread; 
     1179    unsigned                     pendingJobSize; 
     1180    std::list<PendingJob*>       pendingJobs; 
     1181 
     1182    void performPendingJobs(); 
    11311183 
    11321184    /* Endpoint static callbacks */ 
  • pjproject/branches/projects/pjsua2/pjsip/src/pjsua2/endpoint.cpp

    r4674 r4678  
    112112    NODE_READ_UNSIGNED( this_node, maxCalls); 
    113113    NODE_READ_UNSIGNED( this_node, threadCnt); 
     114    NODE_READ_BOOL    ( this_node, mainThreadOnly); 
    114115    NODE_READ_STRINGV ( this_node, nameserver); 
    115116    NODE_READ_STRING  ( this_node, userAgent); 
     
    126127    NODE_WRITE_UNSIGNED( this_node, maxCalls); 
    127128    NODE_WRITE_UNSIGNED( this_node, threadCnt); 
     129    NODE_WRITE_BOOL    ( this_node, mainThreadOnly); 
    128130    NODE_WRITE_STRINGV ( this_node, nameserver); 
    129131    NODE_WRITE_STRING  ( this_node, userAgent); 
     
    342344 
    343345/////////////////////////////////////////////////////////////////////////////// 
     346/* Class to post log to main thread */ 
     347struct PendingLog : public PendingJob 
     348{ 
     349    LogEntry entry; 
     350    virtual void execute(bool is_pending) 
     351    { 
     352        PJ_UNUSED_ARG(is_pending); 
     353        Endpoint::instance().utilLogWrite(entry); 
     354    } 
     355}; 
     356 
     357/////////////////////////////////////////////////////////////////////////////// 
    344358/* 
    345359 * Endpoint instance 
    346360 */ 
    347361Endpoint::Endpoint() 
    348 : writer(NULL) 
     362: writer(NULL), mainThreadOnly(false), mainThread(NULL), pendingJobSize(0) 
    349363{ 
    350364    if (instance_) { 
     
    365379Endpoint::~Endpoint() 
    366380{ 
     381    while (!pendingJobs.empty()) { 
     382        delete pendingJobs.front(); 
     383        pendingJobs.pop_front(); 
     384    } 
     385 
    367386    try { 
    368387        libDestroy(); 
     
    383402} 
    384403 
     404void Endpoint::utilAddPendingJob(PendingJob *job) 
     405{ 
     406    enum { 
     407        MAX_PENDING_JOBS = 1024 
     408    }; 
     409 
     410    /* See if we can execute immediately */ 
     411    if (!mainThreadOnly || pj_thread_this()==mainThread) { 
     412        job->execute(false); 
     413        delete job; 
     414        return; 
     415    } 
     416 
     417    if (pendingJobSize > MAX_PENDING_JOBS) { 
     418        enum { NUMBER_TO_DISCARD = 5 }; 
     419 
     420        pj_enter_critical_section(); 
     421        for (unsigned i=0; i<NUMBER_TO_DISCARD; ++i) { 
     422            delete pendingJobs.back(); 
     423            pendingJobs.pop_back(); 
     424        } 
     425 
     426        pendingJobSize -= NUMBER_TO_DISCARD; 
     427        pj_leave_critical_section(); 
     428 
     429        utilLogWrite(1, THIS_FILE, 
     430                     "*** ERROR: Job queue full!! Jobs discarded!!! ***"); 
     431    } 
     432 
     433    pj_enter_critical_section(); 
     434    pendingJobs.push_back(job); 
     435    pendingJobSize++; 
     436    pj_leave_critical_section(); 
     437} 
     438 
     439/* Handle log callback */ 
     440void Endpoint::utilLogWrite(LogEntry &entry) 
     441{ 
     442    if (mainThreadOnly && pj_thread_this() != mainThread) { 
     443        PendingLog *job = new PendingLog; 
     444        job->entry = entry; 
     445        utilAddPendingJob(job); 
     446    } else { 
     447        writer->write(entry); 
     448    } 
     449} 
     450 
     451/* Run pending jobs only in main thread */ 
     452void Endpoint::performPendingJobs() 
     453{ 
     454    if (pj_thread_this() != mainThread) 
     455        return; 
     456 
     457    if (pendingJobSize == 0) 
     458        return; 
     459 
     460    for (;;) { 
     461        PendingJob *job = NULL; 
     462 
     463        pj_enter_critical_section(); 
     464        if (pendingJobSize != 0) { 
     465            job = pendingJobs.front(); 
     466            pendingJobs.pop_front(); 
     467            pendingJobSize--; 
     468        } 
     469        pj_leave_critical_section(); 
     470 
     471        if (job) { 
     472            job->execute(true); 
     473            delete job; 
     474        } else 
     475            break; 
     476    } 
     477} 
     478 
    385479/////////////////////////////////////////////////////////////////////////////// 
    386480/* 
     
    400494    entry.threadName = string(pj_thread_get_name(pj_thread_this())); 
    401495 
    402     ep.writer->write(entry); 
     496    ep.utilLogWrite(entry); 
    403497} 
    404498 
     
    837931} 
    838932 
     933struct PendingOnDtmfDigitCallback : public PendingJob 
     934{ 
     935    int call_id; 
     936    OnDtmfDigitParam prm; 
     937 
     938    virtual void execute(bool is_pending) 
     939    { 
     940        PJ_UNUSED_ARG(is_pending); 
     941 
     942        Call *call = Call::lookup(call_id); 
     943        if (!call) 
     944            return; 
     945 
     946        call->onDtmfDigit(prm); 
     947    } 
     948}; 
     949 
    839950void Endpoint::on_dtmf_digit(pjsua_call_id call_id, int digit) 
    840951{ 
     
    844955    } 
    845956     
    846     OnDtmfDigitParam prm; 
     957    PendingOnDtmfDigitCallback *job = new PendingOnDtmfDigitCallback; 
     958    job->call_id = call_id; 
    847959    char buf[10]; 
    848960    pj_ansi_sprintf(buf, "%c", digit); 
    849     prm.digit = (string)buf; 
    850      
    851     call->onDtmfDigit(prm); 
     961    job->prm.digit = (string)buf; 
     962     
     963    Endpoint::instance().utilAddPendingJob(job); 
    852964} 
    853965 
     
    9821094} 
    9831095 
     1096 
     1097struct PendingOnMediaTransportCallback : public PendingJob 
     1098{ 
     1099    int call_id; 
     1100    OnCallMediaTransportStateParam prm; 
     1101 
     1102    virtual void execute(bool is_pending) 
     1103    { 
     1104        PJ_UNUSED_ARG(is_pending); 
     1105 
     1106        Call *call = Call::lookup(call_id); 
     1107        if (!call) 
     1108            return; 
     1109 
     1110        call->onCallMediaTransportState(prm); 
     1111    } 
     1112}; 
     1113 
    9841114pj_status_t 
    9851115Endpoint::on_call_media_transport_state(pjsua_call_id call_id, 
     
    9901120        return PJ_SUCCESS; 
    9911121    } 
    992      
    993     OnCallMediaTransportStateParam prm; 
    994     prm.medIdx = info->med_idx; 
    995     prm.state = info->state; 
    996     prm.status = info->status; 
    997     prm.sipErrorCode = info->sip_err_code; 
    998      
    999     call->onCallMediaTransportState(prm); 
    1000      
     1122 
     1123    PendingOnMediaTransportCallback *job = new PendingOnMediaTransportCallback; 
     1124     
     1125    job->call_id = call_id; 
     1126    job->prm.medIdx = info->med_idx; 
     1127    job->prm.state = info->state; 
     1128    job->prm.status = info->status; 
     1129    job->prm.sipErrorCode = info->sip_err_code; 
     1130     
     1131    Endpoint::instance().utilAddPendingJob(job); 
     1132 
    10011133    return PJ_SUCCESS; 
    10021134} 
     1135 
     1136struct PendingOnMediaEventCallback : public PendingJob 
     1137{ 
     1138    int call_id; 
     1139    OnCallMediaEventParam prm; 
     1140 
     1141    virtual void execute(bool is_pending) 
     1142    { 
     1143        Call *call = Call::lookup(call_id); 
     1144        if (!call) 
     1145            return; 
     1146 
     1147        if (is_pending) { 
     1148            /* Can't do this anymore, pointer is invalid */ 
     1149            prm.ev.pjMediaEvent = NULL; 
     1150        } 
     1151 
     1152        call->onCallMediaEvent(prm); 
     1153    } 
     1154}; 
    10031155 
    10041156void Endpoint::on_call_media_event(pjsua_call_id call_id, 
     
    10111163    } 
    10121164     
    1013     OnCallMediaEventParam prm; 
    1014     prm.medIdx = med_idx; 
    1015     prm.ev.fromPj(*event); 
    1016      
    1017     call->onCallMediaEvent(prm); 
     1165    PendingOnMediaEventCallback *job = new PendingOnMediaEventCallback; 
     1166 
     1167    job->call_id = call_id; 
     1168    job->prm.medIdx = med_idx; 
     1169    job->prm.ev.fromPj(*event); 
     1170     
     1171    Endpoint::instance().utilAddPendingJob(job); 
    10181172} 
    10191173 
     
    10581212{ 
    10591213    PJSUA2_CHECK_EXPR( pjsua_create() ); 
     1214    mainThread = pj_thread_this(); 
    10601215} 
    10611216 
     
    10801235        log_cfg.cb = &Endpoint::logFunc; 
    10811236    } 
     1237    mainThreadOnly = prmEpConfig.uaConfig.mainThreadOnly; 
    10821238 
    10831239    /* Setup UA callbacks */ 
     
    11361292int Endpoint::libHandleEvents(unsigned msec_timeout) 
    11371293{ 
     1294    performPendingJobs(); 
    11381295    return pjsua_handle_events(msec_timeout); 
    11391296} 
  • pjproject/branches/projects/pjsua2/pjsip/src/pjsua2/media.cpp

    r4676 r4678  
    377377 
    378378AudDevManager::AudDevManager() 
    379 { 
    380     devMedia = new DevAudioMedia; 
     379: devMedia(NULL) 
     380{ 
    381381} 
    382382 
     
    394394AudioMedia &AudDevManager::getCaptureDevMedia() throw(Error) 
    395395{ 
     396    if (!devMedia) 
     397        devMedia = new DevAudioMedia; 
    396398    return *devMedia; 
    397399} 
     
    404406AudioMedia &AudDevManager::getPlaybackDevMedia() throw(Error) 
    405407{ 
     408    if (!devMedia) 
     409        devMedia = new DevAudioMedia; 
    406410    return *devMedia; 
    407411} 
Note: See TracChangeset for help on using the changeset viewer.