"Alun Jones" <a...@texis.com> wrote in message
news:OesM9.376$j_5.279662637@newssvr11.news.prodigy.com...
> In article <#P5i926pCHA.2756@TK2MSFTNGP09>, "Simon Cooke"
> <simonco...@earthlink.net> wrote:
> >I assure you, TCP is indeed made up out of packets of data. Maybe not
at
> >the API level, but underneath it is. And many people (myself
included)
> >build packet (de)serialization routines on top of TCP because TCP
gives
> >you guaranteed transmission of data without having to roll it *all*
> >yourself on top of UDP.
> However, it also gives you the effect of rolling everything into a
stream.
> You send a hundred "packets" of fifty bytes apiece, and you don't get
a
> hundred "packets" of fifty bytes apiece, you get a stream of five
thousand
> bytes. It's broken up and re-assembled at any number of places along
the
> path. You'll probably end up with one recv() call of the first fifty
bytes,
> then another three of about 1500, and finally a recv() of ~450 bytes.
> Probably. Of course, you'll also get any number of other
possibilities going
> on, just so long as it adds up to the right amount of data in the end.
> >No errors, no nothing. Just weird transmission issues. Both systems
just
> >sit and pull data out of their read buffers continuously, and the
load
> >and amount of data transmitted is low, so there shouldn't be an
issue.
> The mention of "data loss" along with "TCP is packets" is a frequent
> combination, and it's almost always wrong. When data loss occurs,
that's a
> _serious_ bug in TCP/IP. It's not something that people wouldn't have
> noticed, because web sites wouldn't download, zip files would be
corrupt, etc,
> etc. It'd be roughly akin to having molecular bonds lose all their
cohesion -
> catastrophic, even in relatively small quantities. So, it's _highly_
unlikely
> that data loss is occurring. Then we go to the likely occurrences
here.
> You've set buffer sizes to zero. That's a really dangerous thing to
do,
> particularly if you don't understand what you're doing. You also
haven't run
> a network monitor to see what's actually going across the wire.
No; I've set the send buffer size to zero; which is perfectly acceptable
when using overlapped IO.
> What I see often reported as "data loss" is the following:
> sender calls send() repeatedly with, say fifty bytes in each send.
> receiver calls recv() repeatedly, and expects to receive fifty bytes
in each
> receive.
> receiver either gets a smaller amount, or a larger amount, and gets
out of
> sync with the sender.
> developer asserts that data has been lost, without realising that it
is his
> code that is losing the data, not the stack or the network.
Well, let's see:
Here's my Client-side packet writer. It's quite simple. Socket is a
simple wrapper around the winsock api which carries the socket handle
with it. Event is a simple wrapper around a Win32 event object.
Synchronizable wraps a Win32 critical section object, whereas
Synchronize() is a class which performs a lock on creation, and an
unlock on destruction. CThreadImpl is an object-based wrapper around
win32 threading, with thunking.
A ClientNetwork object is created, which performs the connection and
host lookup (that bit of code needs debugging right now; it's faulty --
the actual network code is solid).
I still get packets (my implementation) delayed by a considerable amount
somewhere, under certain conditions. The ethernet connection is fine; no
packet retransmissions. netstat stats also appear to be fine.
Timestamping everything gives packets arriving typically within 1ms of
being sent. Except for this one lockup condition which I thought I'd
gotten rid of, where everything just slows down and new packets entering
the queue (say, because of user commands) only arrive at the remote
system after 3 or so other commands have been pushed into the queue.
It's plain bizarre.
class PacketQueue : public Synchronizable
{
public:
class PACKET {
public:
BYTE* pData;
size_t len;
PACKET() : pData(NULL), len(0)
{
}
void Delete()
{
if (pData != NULL)
{
delete[] pData;
pData = NULL;
}
}
};
private:
deque<PACKET> queue;
static void DeletePacket(PACKET& p);
public:
Event dataAvailable;
PacketQueue()
{
dataAvailable.Create();
}
~PacketQueue()
{
if (!queue.empty()) for_each(queue.begin(), queue.end(),
DeletePacket);
}
size_t GetCount()
{
Synchronize to(*this);
return queue.size();
}
void Clear()
{
Synchronize to(*this);
dataAvailable.ResetEvent();
if (!queue.empty()) {
for_each(queue.begin(), queue.end(), DeletePacket);
}
queue.clear();
}
void Add(BYTE* pData, size_t len)
{
{
Synchronize to(*this);
PACKET p;
p.pData = pData;
p.len = len;
if (*(reinterpret_cast<MessageType*>(pData)) == MsgMeasurementRun)
{
Packet_MeasurementRun* pmr =
reinterpret_cast<Packet_MeasurementRun*>(pData);
pmr->queueTime = GetTimeStamp();
}
queue.push_back(p);
}
dataAvailable.SetEvent();
}
bool IsEmpty()
{
Synchronize to(*this);
return queue.empty();
}
bool Get(PACKET& packet)
{
{
Synchronize to(*this);
if (queue.empty()) return false;
if (queue.size() == 1)
{
dataAvailable.ResetEvent();
}
packet = queue.front();
queue.pop_front();
}
return true;
}
};
class ClientNetworkWriter : public CThreadImpl<ClientNetworkWriter>
{
Event& cancelOps;
PacketQueue& queue;
WSAOVERLAPPED ov;
Event completion;
Socket out;
public:
ClientNetworkWriter(Event& cancel, PacketQueue& packetqueue) :
cancelOps(cancel), queue(packetqueue)
{
completion.Create();
ov.hEvent = completion;
};
~ClientNetworkWriter()
{
}
void Initialize(Socket& outsocket)
{
if (out.IsValid()) out.Detach();
out.Attach(outsocket);
}
DWORD Run()
{
DWORD result = RunNetworkSend();
//DEBUG: OutputDebugString("ClientNetworkWriter has exited\n");
out.Detach();
return result;
}
DWORD RunNetworkSend()
{
// Add an outgoing time-sync packet.
Packet_SyncTimeStamps pst;
::GetSystemTime(&(pst.systime));
pst.masterclock = ::GetTickCount();
BYTE* pData = new BYTE[sizeof(Packet_SyncTimeStamps)];
memcpy(pData, &pst, sizeof(Packet_SyncTimeStamps));
queue.Add(pData, sizeof(Packet_SyncTimeStamps));
while (true)
{
if (!WaitForDataOrCancel()) return WSAECANCELLED;
PacketQueue::PACKET p;
queue.Get(p);
BYTE* pTemp = p.pData;
int len = p.len;
while (len > 0)
{
WSABUF wb;
wb.buf = (char*) pTemp;
wb.len = len;
DWORD unused;
if (out.WSASend(&wb, 1, &unused, 0, &ov, NULL) == SOCKET_ERROR)
{
if (::WSAGetLastError() != WSA_IO_PENDING)
{
p.Delete();
return ::WSAGetLastError();
}
}
if (!WaitForCompletionOrCancel())
{
CleanUp();
p.Delete();
}
DWORD count, flags;
if (!::WSAGetOverlappedResult(out, &ov, &count, TRUE, &flags))
{
p.Delete();
return ::WSAGetLastError();
}
if (count == 0) {
p.Delete();
return WSAENOTCONN; // connection closed.
}
len -= count;
pTemp += count;
}
p.Delete();
}
}
void CleanUp()
{
::CancelIo((HANDLE)(SOCKET)out);
::WaitForSingleObject(completion, INFINITE);
}
bool WaitForDataOrCancel()
{
HANDLE events[2];
events[0] = cancelOps;
events[1] = queue.dataAvailable;
return (::WaitForMultipleObjects(2, events, FALSE, INFINITE) ==
WAIT_OBJECT_0 + 1);
}
bool WaitForCompletionOrCancel()
{
HANDLE events[2];
events[0] = cancelOps;
events[1] = completion;
return (::WaitForMultipleObjects(2, events, FALSE, INFINITE) ==
WAIT_OBJECT_0 + 1);
}
};
class NetworkReader : public CThreadImpl<NetworkReader>
{
Socket in;
Event& cancelOps;
Event completion;
WSAOVERLAPPED ov;
PacketQueue& queue;
BYTE* pCurrentPacket; //< the packet we're currently building
BYTE* pCurrentPacketIndex; //< offset into the current packet.
size_t packet_left; //< data left in the current packet
size_t packetsize;
Buffer readBuffer;
bool pendingRead;
bool gotHeader;
// Disable copy constructor
NetworkReader(const NetworkReader& netw);
public:
NetworkReader(Event& cancel, PacketQueue& packetqueue) :
cancelOps(cancel), queue(packetqueue),
pCurrentPacket(NULL), packet_left(0),
packetsize(0), pendingRead(false), gotHeader(false)
{
completion.Create();
ov.hEvent = completion;
}
~NetworkReader()
{
}
void Initialize(Socket& readsocket)
{
// Allocate a buffer to read data into
readBuffer.AllocateBuffer();
// Attach to read socket
if (in.IsValid()) in.Detach();
in.Attach(readsocket);
// Clear the current packet and packet queue.
pCurrentPacket = NULL;
pCurrentPacketIndex = NULL;
packet_left = 0;
packetsize = 0;
queue.Clear();
// Reset the completion event status
completion.ResetEvent();
pendingRead = false;
gotHeader = false;
}
DWORD Run()
{
DWORD result = RunNetworkRead();
//DEBUG: OutputDebugString("NetworkReader has exited\n");
CleanUp();
in.Detach();
readBuffer.FreeBuffer();
pendingRead = false;
gotHeader = false;
return result;
}
private:
DWORD RunNetworkRead()
{
while (true)
{
// start reading on the socket.
if (!StartRead()) return -1;
if (!WaitForCompletionOrCancel()) return -1;
// complete the read
if (!CompleteRead()) return -1;
// Take a packet from the buffer (or keep building one)
PopPacket();
}
}
void PopPacket()
{
if (!gotHeader)
{
int size;
BYTE* pData = readBuffer.GetContiguousBlock(size);
if (size < sizeof(MessageType))
return;
MessageType msg = *((MessageType*)pData);
int len;
switch(msg)
{
case MsgTelemetryNoScan:
len = sizeof(Packet_TelemetryNoData);
break;
case MsgTelemetryWithScan:
len = sizeof(Packet_TelemetryWithData);
break;
case MsgMeasurementRun:
len = sizeof(Packet_MeasurementRun);
break;
case
...
read more »