[prev in list] [next in list] [prev in thread] [next in thread] 

List:       helix-server-cvs
Subject:    [Server-cvs] engine/inputsource Umakefil, 1.11.8.1.4.1,
From:       jgordon () helixcommunity ! org
Date:       2008-10-23 18:52:24
Message-ID: 200810231859.m9NIx12R031227 () mailer ! progressive-comp ! com
[Download RAW message or body]

Update of /cvsroot/server/engine/inputsource
In directory cvs01.internal.helixcommunity.org:/tmp/cvs-serv29899

Modified Files:
      Tag: SERVER_12_0
	Umakefil base_shim.cpp live_source_wrapper.cpp 
Log Message:
Synopsis
========
Fixes PR 227668
Fixes numerous issues that are screwing up sync on switches of
live RTP streams

Branches: SERVER_12_0
Reviewer: jzeng


Description
===========
Many areas were having issues with timestamp rollover on
calculation and comparisons. These are fixed to handle rollover.

There were issues with comparisons/calculations using the delivery
timestamp in RTP live where it needs to use the RTP timestamp.

There can be rollover issues with using the TimestampConverter for
certain things, like comparisons between two different RTP streams.
Also this loses precision with conversion to ms. These are changed
to use precise conversion to Timeval, etc.

Timeval did not work correctly with negative values.

NTPTime has no conversion to usable units more precise than ms.
ToTimeval added. Note: There is some weirdness with the pre-existing
FromTimeval that is a little awkward here and needs thought for HEAD.

The comparisons that skip out of order late packets after a switch
on live for RM were causing many RTP live packets to be incorrectly
skipped due to different timestamps etc. This check is now skipped
for RTP live streams and the ingress seq number checked. Only the
first 10 packets after a switch are checked now, to avoid inappropriate
skipping after we get halfway through the seq-no space!


RTP live streams were being synced to the time of the earliest SR
report. It instead needs to be sync'ed to the first packet of the
audio stream, with the SRs times used to determine offsets between
the two streams, etc. This is a major issue for RTP live because the
encoder's RTCP time line tends to be a few seconds off from the RTP
streams. So take NTP diffs between the SRs (signed) PLUS diff between
sync packet RTP time and sync stream SR RTP time (rolled over and
converted - signed), need to be added to the stream's SR RTP time
(converted) then converted back.

We have to make sure that the earliest packet we send is the last audio
packet before the next video key frame. Also, we need to make sure that
earliest packet is sent first, regardless what order our GetPackets came
in, etc.

TODO:
Much of this is hacked in and needs to be moved for 12.1+. A large
portion of the stuff this adds to RTP live FCS needs to be moved to
qtbcplin RTP live general (and much of that will also be needed for
LRA); and a lot of this stuff that is in RTP live FCS should be in more
general FCS.


Files Affected
==============
common/util/pub/ntptime.h
common/util/pub/timeval.h
server/include/sink.h
server/common/util/pub/tscalc.h[new]
server/engine/inputsource/Umakefil
server/engine/inputsource/base_shim.cpp
server/engine/inputsource/live_source_wrapper.cpp
server/engine/inputsource/pub/base_shim.h
server/engine/inputsource/pub/live_source_wrapper.h


Testing Performed
=================
Integration Tests:
* Verified RTP live plays and maintains reasonable A/V sync with
Microcore player after 20 switches
* Verified on-demand 3gp switching still works as expected
* Verified on-demand RM switching
* Verified RM live switching

Leak Tests:
* PENDING: --lct with switch request

Platforms Tested: linux-rhel4-i686
Build verified: linux-rhel4-i686, sunos-5.10-sparc-server, win32-i386-vc7



Index: base_shim.cpp
===================================================================
RCS file: /cvsroot/server/engine/inputsource/base_shim.cpp,v
retrieving revision 1.1.2.8.2.14
retrieving revision 1.1.2.8.2.15
diff -u -d -r1.1.2.8.2.14 -r1.1.2.8.2.15
--- base_shim.cpp	17 Oct 2008 22:31:53 -0000	1.1.2.8.2.14
+++ base_shim.cpp	23 Oct 2008 18:52:22 -0000	1.1.2.8.2.15
@@ -51,6 +51,7 @@
 #include "hxmarsh.h"
 #include "tconverter.h"
 #include "tsfixup.h"
+#include "tscalc.h"
 
 #include "hxqos.h"
 #include "hxqosinfo.h"
@@ -66,6 +67,8 @@
 #include "ratedescmgr.h"
 #include "livekeyframe.h"
 
+const UINT32 BasePacketShim::g_ulOutOfOrderPacketsCheck = 10;
+
 BasePacketShim* BasePacketShim::CreatePacketShim(BOOL bMDP,
                                                     IUnknown* pSource,
                                                     IHXRegistry* pReg,
@@ -125,6 +128,7 @@
     ,m_bAllStreamsStarted(TRUE)
     ,m_unNumStreamsStarted(0)
     ,m_unNumStreamsMovedPastStartTS(0)
+    ,m_ulOldPacketCheck(0)
 {
     if (pCCF)
     {
@@ -150,18 +154,12 @@
      * because the pkt ts < m_ulStartingTS) until all streams' packets have 
      * gone past the m_ulStartingTS.
      */
-    m_pbFirstPacketFromSwitchedSource = new BOOL[num_streams];
     m_pbStreamHasMovedPastStartTS = new BOOL[num_streams];
-    m_pulStartingTSPerStream = new UINT32[num_streams];
-    m_pulStartingRTPTSPerStream = new UINT32[num_streams];
     m_pStreamInfo = new FCSStreamData[num_streams];
     HX_ASSERT(m_pStreamInfo);
 
     memset(m_pStreamInfo, 0, (sizeof(FCSStreamData) * num_streams));
-    memset(m_pbFirstPacketFromSwitchedSource, 0xff, (sizeof(BOOL) * num_streams));
     memset(m_pbStreamHasMovedPastStartTS, 0xff, (sizeof(BOOL) * num_streams));
-    memset(m_pulStartingTSPerStream, 0xff, (sizeof(UINT32) * num_streams));
-    memset(m_pulStartingRTPTSPerStream, 0xff, (sizeof(UINT32) * num_streams));
 
     for(UINT16 i = 0; i < num_streams; i++)
     {
@@ -170,10 +168,7 @@
             continue;
         }
 
-        m_pbFirstPacketFromSwitchedSource[i] = TRUE;
 	m_pbStreamHasMovedPastStartTS[i] = FALSE;
-	m_pulStartingTSPerStream[i] = 0;
-	m_pulStartingRTPTSPerStream[i] = 0;
 	m_unSubscribedStreamCount++;
 
         //tsconverter only needed for 3gp datatypes
@@ -268,11 +263,7 @@
 
     _Done();
 
-    HX_DELETE(m_pbFirstPacketFromSwitchedSource);
     HX_DELETE(m_pbStreamHasMovedPastStartTS);
-    HX_DELETE(m_pulStartingTSPerStream);
-    HX_DELETE(m_pulStartingRTPTSPerStream);
-
     HX_RELEASE(m_pCCF);
 
     if(m_pStreamInfo)
@@ -314,20 +305,13 @@
     m_unNumStreamsMovedPastStartTS = 0;
 
     UINT32 ulOffset = 0;
-    UINT ulRTPOffsetInMSec = 0;
+    UINT32 ulRTPOffsetInMSec = 0;
     UINT32 i = 0;
     for(i = 0; i < m_uNumOfStreams; i++)
     {
-	m_pbFirstPacketFromSwitchedSource[i] = TRUE;
-	m_pulStartingTSPerStream[i] = 0;
-	m_pulStartingRTPTSPerStream[i] = 0;
 	m_pbStreamHasMovedPastStartTS[i] = FALSE;
 
         ulOffset = m_pStreamInfo[i].ulLastPacketTS + m_pStreamInfo[i].ulPacketDiff;
-        if(ulOffset > m_ulDeliveryTimeOffset)
-        {
-            m_ulDeliveryTimeOffset = ulOffset;
-        }
 
         if(m_pStreamInfo[i].pTSConverter)
         {
@@ -339,8 +323,9 @@
             ulRTPOffsetInMSec = m_pStreamInfo[i].ulLastPacketRTPTS + \
m_pStreamInfo[i].ulPacketRTPDiff;  }
 
-        if(ulRTPOffsetInMSec > m_ulRTPTimeOffsetInMSec)
+        if(DiffTimeStamp(ulRTPOffsetInMSec, m_ulRTPTimeOffsetInMSec) > 0)
         {
+            m_ulDeliveryTimeOffset = ulOffset;
             m_ulRTPTimeOffsetInMSec = ulRTPOffsetInMSec;
         }
     }
@@ -373,9 +358,7 @@
 void
 BasePacketShim::ParsePacket(IHXPacket* pPacket, FCSPacketData* pPacketData)
 {
-    UINT32 ulTS = 0;
-    IHXBuffer* pBuffer = NULL;
-    HX_RESULT ulRet = HXR_OK;
+    pPacketData->unSeqNo = 0;
 
     if(m_bFirstPacketFromSource)
     {
@@ -399,7 +382,20 @@
     {
         pPacket->Get(pPacketData->pBuffer, pPacketData->ulTS, \
pPacketData->unStreamNumber,   pPacketData->unASMFlags, \
                pPacketData->unASMRuleNumber);
-        if(m_pStreamInfo[pPacketData->unStreamNumber].pTSConverter)
+
+        if (m_bWirePayload)
+        {
+            BYTE* pBuf = pPacketData->pBuffer->GetBuffer();
+            // We have to parse out the RTP timestamp and seqno
+            // XXXJDG this should be handled in qtbcplin
+            if (pPacketData->pBuffer->GetSize() >= 12 &&
+                (pBuf[0] & 0xC0) == 0x80) // verify size and version
+            {
+                pPacketData->unSeqNo = pBuf[2] << 8 | pBuf[3];
+                pPacketData->ulRTPTS = pBuf[4] << 24 | pBuf[5] << 16 | pBuf[6] << 8 \
| pBuf[7]; +            }
+        }
+        else if(m_pStreamInfo[pPacketData->unStreamNumber].pTSConverter)
         {
             pPacketData->ulRTPTS =
                 m_pStreamInfo[pPacketData->unStreamNumber].pTSConverter->hxa2rtp(pPacketData->ulTS);
 @@ -414,7 +410,6 @@
 IHXPacket*
 BasePacketShim::AdjustTimeStamp(IHXPacket* pPacket)
 {
-    UINT32 ulTS = 0; UINT32 ulRTPTS = 0;
     ServerRTPPacket* pNewPacket = NULL;      
     FCSPacketData packetData;
 
@@ -426,44 +421,74 @@
 	// only the very first packet's data after the switch is recorded
         RecordFirstPacket(&packetData);
         m_bFirstPacketFromSource = FALSE;
-        m_pbFirstPacketFromSwitchedSource[unStreamNum] = FALSE;
-	m_pulStartingTSPerStream[unStreamNum] = packetData.ulTS;
-    }
-    else if (m_pbFirstPacketFromSwitchedSource[unStreamNum])
-    {
-	// all subsequent streams' first packet's data is recorded
-        m_pbFirstPacketFromSwitchedSource[unStreamNum] = FALSE;
-	m_pulStartingTSPerStream[unStreamNum] = packetData.ulTS;
-	m_pulStartingRTPTSPerStream[unStreamNum] = packetData.ulRTPTS;
     }
 
     FCSStreamData* pStreamInfo = &m_pStreamInfo[unStreamNum];
+    HXBOOL bOldPacket = FALSE;
 
-    // monitor streams' TS moving past the very first packet's TS
-    // (m_ulStartingTS) after the switch
-    if (!m_pbStreamHasMovedPastStartTS[unStreamNum] && packetData.ulTS >= \
                m_ulStartingTS)
-    {
-	m_pbStreamHasMovedPastStartTS[unStreamNum] = TRUE;
-	if (++m_unNumStreamsMovedPastStartTS >= m_unSubscribedStreamCount)
-	{
-	    m_bAllStreamsStarted = TRUE;
-	}
-    }
-    /* only after all streams have started and moved past the m_ulStartingTS
-     * can the server skip packets as described below.
-     *
-     * we are ahead of the first packet, just get another one.
-     * XXXJJ This happens in live, where packets are not in sequence. We can't pass \
                this packet
-     * downstreams before the unsignness of timestamp.  You will get a huge number \
                when you do the
-     * adjustment.
-     */
-    else if (packetData.ulTS < m_ulStartingTS && m_bAllStreamsStarted)
-    {
-        return NULL;
-    }
-    else if (packetData.ulRTPTS < pStreamInfo->ulStartingRTPTS && \
m_bAllStreamsStarted) +    if (m_ulOldPacketCheck)
     {
-        return NULL;
+        --m_ulOldPacketCheck;
+
+        // monitor streams' TS moving past the very first packet's TS
+        // (m_ulStartingTS) after the switch
+        if (!m_pbStreamHasMovedPastStartTS[unStreamNum])
+        {
+            if (m_bWirePayload)
+            {
+                m_pStreamInfo[unStreamNum].m_usSrcSeqNo = packetData.unSeqNo;
+                bOldPacket = FALSE;
+#ifdef FCS_DEBUG 
+                printf("first packet: strm=%u: rule=%u seq=%u ts=%u rtpts=%u\n",
+                    unStreamNum, packetData.unASMRuleNumber, 
+                    packetData.unSeqNo, packetData.ulTS, packetData.ulRTPTS);
+#endif
+            }
+            else
+            {
+                bOldPacket = DiffTimeStamp(packetData.ulTS, m_ulStartingTS) < 0;
+            }
+
+            if (!bOldPacket)
+            {
+                m_pbStreamHasMovedPastStartTS[unStreamNum] = TRUE;
+	        if (++m_unNumStreamsMovedPastStartTS >= m_unSubscribedStreamCount)
+	        {
+	            m_bAllStreamsStarted = TRUE;
+	        }
+            }
+        }
+        else
+        {
+            if (m_bWirePayload)
+            {
+                bOldPacket = DiffRTPSeqNo(packetData.unSeqNo, 
+                    m_pStreamInfo[unStreamNum].m_usSrcSeqNo) < 0;
+            }
+            else
+            {
+                bOldPacket = DiffTimeStamp(packetData.ulTS, m_ulStartingTS) < 0;
+            }
+
+            /* only after all streams have started and moved past the m_ulStartingTS
+            * can the server skip packets as described below.
+            *
+            * we are ahead of the first packet, just get another one.
+            * XXXJJ This happens in live, where packets are not in sequence. We \
can't pass this packet +            * downstreams before the unsignness of timestamp. \
You will get a huge number when you do the +            * adjustment.
+            */
+            if (bOldPacket && m_bAllStreamsStarted)
+            {
+#ifdef FCS_DEBUG
+                printf("WARNING: skipping packet: strm=%u ts=%u seq=%u\n",
+                    unStreamNum, packetData.ulTS, packetData.unSeqNo);
+                printf("skipped_packet: start-time=%u, start-seq=%u\n",
+                    m_ulStartingTS, m_pStreamInfo[unStreamNum].m_usSrcSeqNo);
+#endif
+                return NULL;
+            }
+        }
     }
 
     //we need to strip the rtp header for wired payload.
@@ -491,19 +516,22 @@
         unStreamNum == m_ulSwitchStream && 
         ((pPacket->GetASMFlags() & HX_ASM_SWITCH_OFF) != 0);
 
-    packetData.ulTS = packetData.ulTS - m_ulStartingTS + m_ulDeliveryTimeOffset;
-    packetData.ulRTPTS = packetData.ulRTPTS - pStreamInfo->ulStartingRTPTS +
-        pStreamInfo->ulRTPTimeOffset;
-
+    packetData.ulRTPTS = pStreamInfo->ulRTPTimeOffset + 
+                        DiffTimeStamp(packetData.ulRTPTS, \
pStreamInfo->ulStartingRTPTS); +        
+    packetData.ulTS = m_ulDeliveryTimeOffset + 
+                        DiffTimeStamp(packetData.ulTS, m_ulStartingTS);
     pNewPacket = new ServerRTPPacket(TRUE);
     pNewPacket->SetRTP(packetData.pBuffer, packetData.ulTS, packetData.ulRTPTS, 
             unStreamNum, packetData.unASMFlags, packetData.unASMRuleNumber);
     pNewPacket->SetMediaTimeInMs(packetData.ulTS);
 
     //update last packet info
-    pStreamInfo->ulPacketDiff = packetData.ulTS - pStreamInfo->ulLastPacketTS;
-    UINT32 ulLastPacketTS = pStreamInfo->ulLastPacketTS = packetData.ulTS;
-    pStreamInfo->ulPacketRTPDiff = packetData.ulRTPTS - \
pStreamInfo->ulLastPacketRTPTS; +    pStreamInfo->ulPacketDiff = \
DiffTimeStamp(packetData.ulTS,  +                                \
pStreamInfo->ulLastPacketTS); +    pStreamInfo->ulLastPacketTS = packetData.ulTS;
+    pStreamInfo->ulPacketRTPDiff = DiffTimeStamp(packetData.ulRTPTS, 
+                                    pStreamInfo->ulLastPacketRTPTS);
     pStreamInfo->ulLastPacketRTPTS = packetData.ulRTPTS;
 
     m_pStreamInfo[unStreamNum].bGettingPacket = FALSE;
@@ -571,51 +599,54 @@
     {
         m_ulStartingTS = pPacketData->ulTS; 
     }
-    m_pulStartingTSPerStream[ulStreamNumber] = m_ulStartingTS;
     m_pbStreamHasMovedPastStartTS[ulStreamNumber] = FALSE;
     m_unNumStreamsMovedPastStartTS = 0;
+    m_ulOldPacketCheck = g_ulOutOfOrderPacketsCheck;
 
-    UINT32 ulHXATime;
-    if (m_pStreamInfo[ulStreamNumber].pTSConverter)
+    if (m_bWirePayload)
     {
-        ulHXATime = \
m_pStreamInfo[ulStreamNumber].pTSConverter->rtp2hxa_raw(ulRTPTime);  +        for \
(UINT16 i = 0; i < m_uNumOfStreams; i++) +        {
+            // The offset is set for each stream according to the earliest 
+            // packet across streams
+            m_pStreamInfo[i].ulStartingRTPTS = \
m_pLiveSourceWrapper->GetRTPTimeOffset(i); +            m_ulStartingTS = \
m_pLiveSourceWrapper->GetDeliveryTimeOffset(); +        }
     }
     else
     {
-        ulHXATime = ulRTPTime;
-    }
+        UINT32 ulHXATime;
+        if (m_pStreamInfo[ulStreamNumber].pTSConverter)
+        {
+            ulHXATime = \
m_pStreamInfo[ulStreamNumber].pTSConverter->rtp2hxa_raw(ulRTPTime);  +        }
+        else
+        {
+            ulHXATime = ulRTPTime;
+        }
 
-    for (UINT32 i = 0; i < m_uNumOfStreams; i++)
-    {
-	if (i == ulStreamNumber)
-	{
-	    m_pStreamInfo[i].ulStartingRTPTS = ulRTPTime;
-	    m_pulStartingRTPTSPerStream[i] = ulRTPTime;
-	}
-	else if(m_pStreamInfo[i].pTSConverter)
-	{
-	    UINT32 ulTSConvRTPTime = m_pStreamInfo[i].pTSConverter->hxa2rtp_raw(ulHXATime);
-	    m_pStreamInfo[i].ulStartingRTPTS = ulTSConvRTPTime;
-	}
-	else
-	{
-	    m_pStreamInfo[i].ulStartingRTPTS = ulHXATime;
-	}
+        for (UINT16 i = 0; i < m_uNumOfStreams; i++)
+        {
+	    if (i == ulStreamNumber)
+	    {
+	        m_pStreamInfo[i].ulStartingRTPTS = ulRTPTime;
+	    }
+	    else if(m_pStreamInfo[i].pTSConverter)
+	    {
+	        UINT32 ulTSConvRTPTime = \
m_pStreamInfo[i].pTSConverter->hxa2rtp_raw(ulHXATime); +	        \
m_pStreamInfo[i].ulStartingRTPTS = ulTSConvRTPTime; +	    }
+	    else
+	    {
+	        m_pStreamInfo[i].ulStartingRTPTS = ulHXATime;
+	    }
+        }
     }
 }
 
 IHXBuffer*
 BasePacketShim::HandleWirePayload(IHXBuffer* pBuffer, FCSPacketData* pPacketData)
 {
-    if(m_bGetRTPOffset == FALSE)
-    {
-        for(UINT16 i = 0; i < m_uNumOfStreams; i++)
-        {
-            m_pStreamInfo[i].m_lRTPOffset = \
                m_pLiveSourceWrapper->GetRTPTimeOffset(i);
-        }
-        m_bGetRTPOffset = TRUE;
-    }
-
     UINT32 ulHeaderLen = 0;
     IHXBuffer* pNewBuffer = NULL;
     UCHAR* pRTPHeader = pBuffer->GetBuffer();
@@ -635,9 +666,6 @@
         ulHeaderLen += (uiRTPExtLen + 1) * 4;
     }
 
-    pPacketData->ulRTPTS = (UINT32)getlong(pRTPHeader + 4);
-    pPacketData->ulRTPTS -= m_pStreamInfo[pPacketData->unStreamNumber].m_lRTPOffset;
-
     //XXXJJ This is a hack.  We need to reconsider it after 12.0
     //Since the live source wrapper already filter out rtcp packets, here 
     // we reuse rule 1 for marker.

Index: live_source_wrapper.cpp
===================================================================
RCS file: /cvsroot/server/engine/inputsource/live_source_wrapper.cpp,v
retrieving revision 1.4.2.3.2.7
retrieving revision 1.4.2.3.2.8
diff -u -d -r1.4.2.3.2.7 -r1.4.2.3.2.8
--- live_source_wrapper.cpp	7 Oct 2008 22:22:14 -0000	1.4.2.3.2.7
+++ live_source_wrapper.cpp	23 Oct 2008 18:52:22 -0000	1.4.2.3.2.8
@@ -45,6 +45,8 @@
 #include "ntptime.h"
 #include "netbyte.h"
 #include "hxmarsh.h"
+#include "tscalc.h"
+#include "sdptools.h"
 
 CLiveSourceWrapper::CLiveSourceWrapper(IHXPSourceLivePackets* pLiveSource, 
                                         IHXCommonClassFactory* pCCF,
@@ -61,18 +63,24 @@
     , m_bFailed(FALSE)
     , m_ulRefCount(0)
     , m_bIsWirePayload(bIsWirePayload)
-    ,m_bGotRTPOffset(FALSE)
+    , m_bGotRTPOffset(FALSE)
+    , m_ulStartTime(0)
+    , m_usSyncStream(0xFFFF)
+    , m_bSyncPacketsReady(FALSE)
+    , m_bSyncStreamStarted(FALSE)
 {
     pLiveSource->AddRef();
     m_pCCF->AddRef();
     m_pStreamInfoArray = new ClientSession::StreamInfo*[m_usNumStreams];
     m_pOwedPacket = new UINT32[m_usNumStreams];
     m_ppSavedPacketList = new CHXSimpleList*[m_usNumStreams];
-    m_tsConverters = new CHXTimestampConverter*[m_usNumStreams];
+    m_RTPFrequencies = new UINT32[m_usNumStreams];
     m_rtcpPackets = new IHXPacket*[m_usNumStreams];
-    m_RTPOffsets = new INT32[m_usNumStreams];
+    m_RTPOffsets = new UINT32[m_usNumStreams];
 
-    for(UINT16 i = 0; i < m_usNumStreams; i++)
+    UINT32 ulStream = 0;
+    UINT16 i;
+    for(i = 0; i < m_usNumStreams; i++)
     {
         m_pStreamInfoArray[i] = pStreamInfoArray[i];
         if(m_pStreamInfoArray[i])
@@ -83,22 +91,52 @@
         m_ppSavedPacketList[i] = new CHXSimpleList(); 
         m_rtcpPackets[i] = NULL;
         m_RTPOffsets[i] = 0;
-        m_tsConverters[i] = NULL;
-
+        m_RTPFrequencies[i] = 1000;
         if(m_bIsWirePayload && pStreamInfoArray[i])
         {
-            if(pStreamInfoArray[i]->m_RTPFactor && pStreamInfoArray[i]->m_HXFactor)
+            if(pStreamInfoArray[i]->m_RTPFactor)
             {
-                m_tsConverters[i] = new \
                CHXTimestampConverter(CHXTimestampConverter::FACTORS, 
-                            pStreamInfoArray[i]->m_HXFactor, 
-                            pStreamInfoArray[i]->m_RTPFactor);
+                m_RTPFrequencies[i] = pStreamInfoArray[i]->m_RTPFactor;
             }
             else if (pStreamInfoArray[i]->m_sampleRate)
             {
-                m_tsConverters[i] = new \
                CHXTimestampConverter(CHXTimestampConverter::SAMPLES,   
-                            pStreamInfoArray[i]->m_sampleRate);
+                m_RTPFrequencies[i] = pStreamInfoArray[i]->m_sampleRate;
+            }
+        }
+
+        IHXBuffer* pMime = NULL;
+        IHXValues* pHeader = pStreamInfoArray[i] ?  
+                    pStreamInfoArray[i]->m_pActualHeader : NULL;
+
+        if(pHeader && SUCCEEDED(pHeader->GetPropertyCString("MimeType", pMime))
+            && SDPMapMimeToMediaType((const char*)pMime->GetBuffer()) ==
+            RTSPMEDIA_TYPE_AUDIO)
+        {
+            pHeader->GetPropertyULONG32("StreamNumber", ulStream);
+            m_usSyncStream = (UINT16)ulStream;
+        }
+    }
+
+    // If there's no audio stream, just sync to whatever stream is setup
+    if (m_usSyncStream == 0xFFFFFFFF)
+    {
+        for (i = 0; i < m_usNumStreams; i++)
+        {
+            if (pStreamInfoArray[i] && pStreamInfoArray[i]->m_bSetupReceived && 
+                pStreamInfoArray[i]->m_pActualHeader)
+            {
+                pStreamInfoArray[i]->m_pActualHeader->GetPropertyULONG32(
+                    "StreamNumber", ulStream);
+                m_usSyncStream = (UINT16)ulStream;
             }
         }
+
+    }
+
+    if (m_usSyncStream == 0xFFFF)
+    {
+        HX_ASSERT(FALSE);
+        m_usSyncStream = 0;
     }
 }
 
@@ -117,16 +155,12 @@
             pPacket->Release();
         }
         HX_DELETE(m_ppSavedPacketList[i]);
-
         HX_RELEASE(m_pStreamInfoArray[i]);
-
         HX_RELEASE(m_rtcpPackets[i]);
-
-        HX_DELETE(m_tsConverters[i]);
     }
 
     HX_DELETE(m_rtcpPackets);
-    HX_DELETE(m_tsConverters);
+    HX_DELETE(m_RTPFrequencies);
     HX_DELETE(m_RTPOffsets);
 
     HX_RELEASE(m_pLiveSource);
@@ -140,6 +174,30 @@
 
 }
 
+inline BOOL CLiveSourceWrapper::IsPacketSubscribed( UINT16 usStreamNumber,
+                                                    UINT16 usRuleNumber)
+{
+    //in qtbcplin the rtcp rules are hardcoded to 1
+    //XXXJJ unless we want to parse the whole asm rule book to get rtcp rule
+    //this appears to be a good workaround.  We can't use pTransport->GetRTCPRule
+    //because this may be from a new source.
+    if(m_bIsWirePayload && usRuleNumber == 1)
+    {
+        return FALSE;
+    }
+
+    if(usStreamNumber >= m_usNumStreams)
+    {
+        return FALSE;
+    }
+
+    if(m_pStreamInfoArray[usStreamNumber])
+    {
+        return m_pStreamInfoArray[usStreamNumber]->m_pbRuleOn[usRuleNumber];
+    }
+    return FALSE;
+}
+
 HX_RESULT CLiveSourceWrapper::StartLivePacket()
 {
     IHXLivePacketBufferQueue* pQueue = NULL;
@@ -161,7 +219,7 @@
             {
                 if(pStreamInfo->m_pbRuleOn[j])
                 {
-                        pPacketBufferProvider->GetPacketBufferQueue(i, j, pQueue);
+                    pPacketBufferProvider->GetPacketBufferQueue(i, j, pQueue);
                 }
 
                 if(pQueue)
@@ -182,7 +240,7 @@
     m_pRSDQueue = new CRSDPacketQueue(1000);
     if(pQueue)
     {
-        m_bNeedToHandleSyncPackets = TRUE;
+        //m_bNeedToHandleSyncPackets = TRUE;
         m_pRSDQueue->AddPacketBufferQueue(pQueue);
         HX_RELEASE(pQueue);
     }
@@ -209,77 +267,224 @@
      return theErr;
 }
 
-INT32 CLiveSourceWrapper::GetRTPTimeOffset(UINT16 usStrm)
+UINT32 CLiveSourceWrapper::GetRTPTimeOffset(UINT16 usStrm)
 {
     return m_RTPOffsets[usStrm];
 }
 
-void CLiveSourceWrapper::CalculateRTPTimeOffset()
+UINT32 CLiveSourceWrapper::GetDeliveryTimeOffset()
 {
+    return m_ulStartTime;
+}
+
+HX_RESULT 
+CLiveSourceWrapper::CalculateRTPTimeOffset()
+{
+    UINT32 ulRTPStartTime = 0;
+    IHXPacket* pPacket = NULL;
+    IHXBuffer* pBuffer = NULL;
+    BYTE* pcBuf = NULL;
+
+    // Get the RTP time of the first packet on the sync stream
+    // that will be the first packet, thus the offset
+    pPacket = (IHXPacket*)m_ppSavedPacketList[m_usSyncStream]->GetHead();
+    pBuffer = pPacket->GetBuffer();
+    pcBuf = pBuffer->GetBuffer();
+
+    if(pBuffer->GetSize() < 12 || !pcBuf || (pcBuf[0] & 0xC0) != 0x80)
+    {
+        // bad packet!!
+        HX_RELEASE(pBuffer);
+        return HXR_FAIL;
+    }
+
+    m_ulStartTime = pPacket->GetTime();
+
+    // Get the RTP timestamp from the packet
+    ulRTPStartTime = ntohl(*(UINT32*)(pcBuf+4));
+    HX_RELEASE(pBuffer);
+
+    if (m_usNumStreams == 1)
+    {
+        // If there is only one stream, no need to deal with
+        // all the sync logic, just set the times and be done
+        m_RTPOffsets[0] = ulRTPStartTime;
+        m_ulStartTime = pPacket->GetTime();
+
+        HX_RELEASE(pBuffer);
+        return HXR_OK;
+    }
+
     NTPTime* pNTPTimesFromSR = new NTPTime[m_usNumStreams];
     UINT32* pRTPtimesFromSR = new UINT32[m_usNumStreams];
-    
+    Timeval tvStrmOffset (0, 0);
+
     UINT16 i = 0;
-    NTPTime earliest(0, 0);
+
+    // Get NTP and RTP-TS times from all the RTCP-SRs
     for(i = 0; i < m_usNumStreams; i++)
     {
-        IHXBuffer* pBuffer = m_rtcpPackets[i]->GetBuffer();
+        pBuffer = m_rtcpPackets[i]->GetBuffer();
         if(pBuffer == NULL || pBuffer->GetSize() < 12)
         {
             //bail out
             HX_DELETE(pNTPTimesFromSR);
             HX_DELETE(pRTPtimesFromSR);
             HX_RELEASE(pBuffer);
-            return;
+            return HXR_FAIL;
         }
 
-        BYTE* pcRTCP = pBuffer->GetBuffer();
-        pcRTCP += 8;
-
-        /* Truncate NTP time to 32 bits from 64 bits to make room for expansion to \
                milliseconds*/
-        pNTPTimesFromSR[i].m_ulSecond = (ntohl(*(unsigned int*)(pcRTCP))) & \
                0x0000ffff;
-        pNTPTimesFromSR[i].m_ulFraction = (ntohl(*(unsigned int*)(pcRTCP+4))) & \
0xffff0000; +        // Get the clock time from the SR
+        BYTE* pcBuf = pBuffer->GetBuffer();
+        pcBuf += 8;
 
-        if(i == 0)
-        {
-            earliest = pNTPTimesFromSR[i];
-        }
-        else if(earliest.toMSec() > pNTPTimesFromSR[i].toMSec())
-        {
-            earliest = pNTPTimesFromSR[i];
-        }
+        pNTPTimesFromSR[i].m_ulSecond = ntohl(*(UINT32*)(pcBuf));
+        pNTPTimesFromSR[i].m_ulFraction = ntohl(*(UINT32*)(pcBuf+4));
 
-        pRTPtimesFromSR[i] = ntohl(*(unsigned int*)(pcRTCP+8));
+        // Get the RTP time from the SR, convert to Timeval
+        pRTPtimesFromSR[i] = ntohl(*(UINT32*)(pcBuf+8));
         HX_RELEASE(pBuffer);
     }
 
+    // Get the offset from the sync stream's SR to the 
+    // packet start time
+    // Mind the units - calculate with appropriate RTP timestamp
+    // rollover, then convert to Timeval for proper handling with
+    // other streams
+    INT32 lRTPPktSROffset = DiffTimeStamp(ulRTPStartTime, 
+                            pRTPtimesFromSR[m_usSyncStream]);
+    Timeval tvPktSROffset ((double)lRTPPktSROffset / 
+                            (double)m_RTPFrequencies[m_usSyncStream]);
 
-    for(i = 0; i < m_usNumStreams; i++)
+    for (i = 0; i < m_usNumStreams; i++)
     {
-        m_RTPOffsets[i] = pRTPtimesFromSR[i] - 
-            m_tsConverters[i]->hxa2rtp(pNTPTimesFromSR[i].toMSec() - \
earliest.toMSec()); +        if (i == m_usSyncStream)
+        {
+            m_RTPOffsets[i] = ulRTPStartTime;
+        }
+        else
+        {
+            // Get the offset from the stream's SR to the start time. 
+            // Can be negative! But watch the NTP handling, as NTP
+            // is unsigned
+            tvStrmOffset = tvPktSROffset;
+            if (pNTPTimesFromSR[i] >= pNTPTimesFromSR[m_usSyncStream])
+            {
+                tvStrmOffset -= (pNTPTimesFromSR[i] - 
+                    pNTPTimesFromSR[m_usSyncStream]).toTimeval();
+            }
+            else
+            {
+                tvStrmOffset += (pNTPTimesFromSR[m_usSyncStream] - 
+                    pNTPTimesFromSR[i]).toTimeval();
+            }
+
+            // and now add this to the SR RTP time (converted) to 
+            // get the stream time at the packet start time
+            tvStrmOffset += (Timeval)((double)pRTPtimesFromSR[i] /
+                        (double)m_RTPFrequencies[i]);
+
+            // and convert back to RTP TS units
+            m_RTPOffsets[i] = ConvertToTimestamp(tvStrmOffset, 
+                                (INT32)m_RTPFrequencies[i]);
+            
+        }
+#ifdef FCS_DEBUG
+        printf("syncing: stream=%u RTP_offset=%u\n", i, m_RTPOffsets[i]);
+#endif 
     }
 
-   HX_DELETE(pNTPTimesFromSR);
-   HX_DELETE(pRTPtimesFromSR);
+#ifdef FCS_DEBUG
+    printf("synching: delivery time offset = %u\n", m_ulStartTime);
+#endif
 
+    HX_DELETE(pNTPTimesFromSR);
+    HX_DELETE(pRTPtimesFromSR);
+
+    return HXR_OK;
 }
 
-BOOL CLiveSourceWrapper::HaveAllSRs(IHXPacket* pPacket)
+BOOL CLiveSourceWrapper::HaveAllSyncData(IHXPacket* pPacket)
 {
     UINT16 usRule = pPacket->GetASMRuleNumber();
     UINT16 usStrm = pPacket->GetStreamNumber();
-    if(usRule != 1) //rtcp rule
-    {
-        return FALSE;
-    }
-    else
+
+    if(usRule == 1) //rtcp rule
     {
+        // Get an SR on each stream
         HX_RELEASE(m_rtcpPackets[usStrm]);
         m_rtcpPackets[usStrm] = pPacket;
         HX_ADDREF(pPacket);
+#ifdef FCS_DEBUG
+        printf("pre-sync: SR received stream=%u, ts=%u\n", 
+            usStrm, pPacket->GetTime());
+#endif
+    }
+    else if (m_bSyncPacketsReady)
+    {
+        // We already took care of the data packets (we must just 
+        // be waiting for SRs) so no we just want to store everything
+        pPacket->AddRef();
+        m_ppSavedPacketList[usStrm]->AddTail(pPacket);
+#ifdef FCS_DEBUG
+        printf("pre-sync: storing packet strm=%u ts=%u\n", 
+                usStrm, pPacket->GetTime());
+#endif
+    }
+    else if (usStrm == m_usSyncStream)
+    {
+        // This is a sync stream packet and we haven't yet seen any 
+        // non-sync packets, so store it at the top
+        while (!m_ppSavedPacketList[m_usSyncStream]->IsEmpty())
+        {
+            IHXPacket* pTmp = (IHXPacket*)m_ppSavedPacketList[usStrm]->RemoveHead();
+            HX_RELEASE(pTmp);
+        }
+
+        pPacket->AddRef();
+        m_ppSavedPacketList[usStrm]->AddTail(pPacket);
+#ifdef FCS_DEBUG
+        printf("pre-sync: new sync packet, stream=%u, ts=%u\n",
+                usStrm, pPacket->GetTime());
+#endif
+    }
+    else if (!m_ppSavedPacketList[m_usSyncStream]->IsEmpty())
+    {
+        // This is a non-sync stream packet and we already have a
+        // sync stream packet. 
+        UINT16 usASMFlags = pPacket->GetASMFlags();
+        if ((usASMFlags & HX_ASM_SWITCH_ON) && !(usASMFlags & HX_ASM_SIDE_EFFECT))
+        {
+            // If this is a key frame, check if it really is later 
+            // than the sync packet
+            pPacket->AddRef();
+            m_ppSavedPacketList[usStrm]->AddTail(pPacket);
+            m_bSyncPacketsReady = TRUE;
+#ifdef FCS_DEBUG
+            printf("pre-sync: key frame stream=%u ts=%u\n", 
+                    usStrm, pPacket->GetTime());
+        }
+        else
+        {
+            printf("pre-sync: tossing non-key-frame stream=%u ts=%u\n",
+                usStrm, pPacket->GetTime());
+#endif
+        }
+        
     }
+#ifdef FCS_DEBUG
+    else
+    {
+        printf("pre-sync: tossing non-sync packet stream=%u ts=%u rule=%u\n",
+            usStrm, pPacket->GetTime(), usRule);
+    }
+#endif
 
+    if (!m_bSyncPacketsReady)
+    {
+        return FALSE;
+    }
+    
     for(UINT16 i = 0; i < m_usNumStreams; i++)
     {
         if(m_rtcpPackets[i] == NULL)
@@ -288,8 +493,76 @@
         }
     }
     return TRUE;
+}
+
+
+HX_RESULT
+CLiveSourceWrapper::SyncRTPStartTimes()
+{
+    HX_RESULT rc = HXR_OK;
+    IHXPacket* pPacket = NULL;
 
+    while(m_bGotRTPOffset == FALSE && SUCCEEDED(rc))
+    {
+        rc = m_pRSDQueue->GetPacket(pPacket);
+        if(SUCCEEDED(rc))
+        {
+            m_bGotRTPOffset = HaveAllSyncData(pPacket);
+
+            HX_RELEASE(pPacket);
+        }
+    }
+
+    if(m_bGotRTPOffset)
+    {
+        rc = CalculateRTPTimeOffset();
+
+        if (SUCCEEDED(rc))
+        {
+            // Fulfill pending GetPackets that we have saved or queued
+            SendPendingPackets();
+        }
+        else
+        {
+            m_pPacketSink->PacketReady(rc, NULL);
+        }
+    }
+
+    return HXR_OK;
 }
+
+void
+CLiveSourceWrapper::SendPendingPackets()
+{
+    HX_RESULT rc = HXR_OK;
+    IHXPacket* pPacket = NULL;
+
+    if (!m_bSyncStreamStarted)
+    {
+        return;
+    }
+    
+    // Make sure we go through all streams but start with
+    // the sync stream
+    UINT16 i,j;
+    for(j = 0; j < m_usNumStreams; j++)
+    {
+        i = (j + m_usSyncStream) % m_usNumStreams;
+        while (m_pOwedPacket[i])
+        {
+            pPacket = FindPacket(i);
+            --m_pOwedPacket[i];
+#ifdef FCS_DEBUG
+            printf("post-sync: sending owed packet stream=%u rule=%u ts=%u\n",
+                pPacket->GetStreamNumber(), pPacket->GetASMRuleNumber(),
+                pPacket->GetTime());
+#endif 
+            m_pPacketSink->PacketReady(rc, pPacket);
+            HX_RELEASE(pPacket);
+        }
+    }
+}
+
 STDMETHODIMP
 CLiveSourceWrapper::GetPacket(UINT16 unStreamNumber)
 {
@@ -303,47 +576,44 @@
         m_bStarted = TRUE;
         StartLivePacket();
     }
+    if (!m_bSyncStreamStarted && unStreamNumber == m_usSyncStream)
+    {
+        m_bSyncStreamStarted = TRUE;
+    }
     HX_ASSERT(unStreamNumber < m_usNumStreams);
 
-    HX_RESULT rc = HXR_OK;
     IHXPacket* pPacket = NULL;
 
     //XXXJJ we need to calculate the rtp offset before we can handle 
     // downstream the packets
     if(m_bIsWirePayload && m_bGotRTPOffset == FALSE)
     {
-        BOOL bHaveAllSRs = FALSE;
-        while(bHaveAllSRs == FALSE)
-        {
-            rc = m_pRSDQueue->GetPacket(pPacket);
-            if(SUCCEEDED(rc))
-            {
-                bHaveAllSRs = HaveAllSRs(pPacket);
-                if(bHaveAllSRs)
-                {
-                    break;
-                }
-            }
-            else
-            {
-                break;
-            }
-            HX_RELEASE(pPacket);
-        }
-        HX_RELEASE(pPacket);
-
-        if(bHaveAllSRs)
-        {
-            CalculateRTPTimeOffset();
-            m_bGotRTPOffset = TRUE;
-        }
-        else
-        {
-            m_pOwedPacket[unStreamNumber]++;
-            return HXR_OK;
-        }
+        m_pOwedPacket[unStreamNumber]++;
+        return SyncRTPStartTimes();
+    }
 
+    pPacket = FindPacket(unStreamNumber);
+    if (pPacket)
+    {
+        m_pPacketSink->PacketReady(HXR_OK, pPacket);
+        pPacket->Release();
     }
+    else
+    {
+        // we are waiting for a PacketReady on this stream
+        ++(m_pOwedPacket[unStreamNumber]);
+    }
+
+    return HXR_OK;
+}
+
+IHXPacket*
+CLiveSourceWrapper::FindPacket(UINT16 unStreamNumber)
+{
+    IHXPacket* pPacket = NULL;
+    HX_RESULT rc = HXR_OK;
+    UINT16 usRuleNum = 0;
+    UINT16 usPktStrmNum = 0;
 
     //check from the saved list first;
     if(!m_ppSavedPacketList[unStreamNumber]->IsEmpty())
@@ -354,38 +624,33 @@
     while(pPacket == NULL)
     {
         rc = m_pRSDQueue->GetPacket(pPacket);
+
         if(FAILED(rc))
         {
             //no packet available at the current time
-            m_pOwedPacket[unStreamNumber]++;
-            return HXR_OK;
+            return NULL;
         }
 
-        UINT16 usRuleNum = pPacket->GetASMRuleNumber();
-        UINT16 usPktStrmNum = pPacket->GetStreamNumber();
+        usRuleNum = pPacket->GetASMRuleNumber();
+        usPktStrmNum = pPacket->GetStreamNumber();
 
         if(!IsPacketSubscribed(usPktStrmNum, usRuleNum))
         {
             //not subscribed
             HX_RELEASE(pPacket);
-            continue;
         }
 
-        if(usPktStrmNum != unStreamNumber)
+        else if(usPktStrmNum != unStreamNumber)
         {
             //not the packet asked for, save it and serve it later
             m_ppSavedPacketList[usPktStrmNum]->AddTail(pPacket);
             //We don't release here, the refcount is taken by the list.
             pPacket = NULL;
-            continue;
         }
-        //we get the packet if reach here
+        // else we get the packet if reach here
     }
 
-
-    m_pPacketSink->PacketReady(HXR_OK, pPacket);
-    HX_RELEASE(pPacket);
-    return HXR_OK;
+    return pPacket;
 }
 
 STDMETHODIMP
@@ -398,23 +663,20 @@
     }
 
     UINT16 usPktStrmNum = pPacket->GetStreamNumber();
+    UINT16 usRuleNum = pPacket->GetASMRuleNumber();
 
     if(m_bIsWirePayload && m_bGotRTPOffset == FALSE)
     {
-        if(HaveAllSRs(pPacket) == TRUE)
-        {
-            CalculateRTPTimeOffset();
-            m_bGotRTPOffset = TRUE;
-        }
-        else
+        HX_RESULT rc = m_pRSDQueue->AddPacket(pPacket);
+        if (SUCCEEDED(rc))
         {
-            return HXR_OK;
+            rc = SyncRTPStartTimes();
         }
+        return rc;
     }
         
     if(m_pOwedPacket[usPktStrmNum] > 0)
     {
-        UINT16 usRuleNum = pPacket->GetASMRuleNumber();
         if(IsPacketSubscribed(usPktStrmNum, usRuleNum))
         {
             m_pOwedPacket[usPktStrmNum]--;
@@ -426,29 +688,6 @@
     return m_pRSDQueue->AddPacket(pPacket);
 }
 
-inline BOOL CLiveSourceWrapper::IsPacketSubscribed( UINT16 usStreamNumber,
-                                                    UINT16 usRuleNumber)
-{
-    //in qtbcplin the rtcp rules are hardcoded to 1
-    //XXXJJ unless we want to parse the whole asm rule book to get rtcp rule
-    //this appears to be a good workaround.  We can't use pTransport->GetRTCPRule
-    //because this may be from a new source.
-    if(m_bIsWirePayload && usRuleNumber == 1)
-    {
-        return FALSE;
-    }
-
-    if(usStreamNumber >= m_usNumStreams)
-    {
-        return FALSE;
-    }
-
-    if(m_pStreamInfoArray[usStreamNumber])
-    {
-        return m_pStreamInfoArray[usStreamNumber]->m_pbRuleOn[usRuleNumber];
-    }
-    return FALSE;
-}
 
 BOOL CLiveSourceWrapper::IsPayloadWirePacket()
 {
@@ -505,3 +744,4 @@
     *ppvObj = NULL;
     return HXR_NOINTERFACE;
 }
+

Index: Umakefil
===================================================================
RCS file: /cvsroot/server/engine/inputsource/Umakefil,v
retrieving revision 1.11.8.1.4.1
retrieving revision 1.11.8.1.4.2
diff -u -d -r1.11.8.1.4.1 -r1.11.8.1.4.2
--- Umakefil	28 Mar 2008 04:31:42 -0000	1.11.8.1.4.1
+++ Umakefil	23 Oct 2008 18:52:22 -0000	1.11.8.1.4.2
@@ -47,6 +47,8 @@
     , "common/runtime/pub"
     , "common/fileio/pub"
     , "common/system/pub/globals"
+    , "protocol/common/util/pub"
+    , "protocol/transport/common/system/pub"
     , "server/include"
     , "server/common/struct/pub"
     , "server/common/util/pub"


_______________________________________________
Server-cvs mailing list
Server-cvs@helixcommunity.org
http://lists.helixcommunity.org/mailman/listinfo/server-cvs


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic