unit BoundedBuf; {Martin Harvey 24/4/2000} interface uses Windows, SysUtils; const DefaultWaitTime = 5000; { Five second wait on mutexes } type { I don't particularly like dynamic arrays, so I'm going to do things the "C" way here, explicitly allocating memory Think of TBufferEntries as ^(array of pointer) } TBufferEntries = ^Pointer; TBoundedBuffer = class private FBufInit: boolean; FBufSize: integer; FBuf: TBufferEntries; FReadPtr, { ReadPtr points to next used entry in buffer} FWritePtr: integer; { WritePtr points to next free entry in buffer} FEntriesFree, FEntriesUsed: THandle; { Flow control semaphores } FCriticalMutex: THandle; { Critical section mutex } protected procedure SetSize(NewSize: integer); public procedure ResetState; destructor Destroy; override; function PutItem(NewItem: Pointer): boolean; function GetItem: Pointer; published property Size: integer read FBufSize write SetSize; end; { No constructor required because default values of 0, false etc acceptable } implementation const FailMsg1 = 'Flow control failed, or buffer not initialised'; FailMsg2 = 'Critical section failed, or buffer not initialised'; procedure TBoundedBuffer.SetSize(NewSize: integer); { Initialises handles and allocates memory. If the buffer size has previously been set, then this may invoke a buffer reset } begin if FBufInit then ResetState; if NewSize < 2 then NewSize := 2; FBufSize := NewSize; GetMem(FBuf, Sizeof(Pointer) * FBufSize); FillMemory(FBuf, Sizeof(Pointer) * FBufSize, 0); FBufInit := true; FCriticalMutex := CreateMutex(nil, false, nil); { note lack of name } { The initial count on the semaphores requires some thought, The maximum count requires more thought. Again, all synchronisation objects are anonymous } FEntriesFree := CreateSemaphore(nil, FBufSize - 1, FBufSize, nil); FEntriesUsed := CreateSemaphore(nil, 0, FBufSize, nil); if (FCriticalMutex = 0) or (FEntriesFree = 0) or (FEntriesUsed = 0) then ResetState end; procedure TBoundedBuffer.ResetState; { Closes handles and deallocates memory. Note that this must unblock threads in such a manner that they quit cleanly } begin if FBufInit then begin WaitForSingleObject(FCriticalMutex, DefaultWaitTime); FBufInit := false; FBufSize := 0; FreeMem(FBuf); { Now wake up all threads currently waiting. Currently assumes only 1 producer and 1 consumer. Plenty of ordering subtleties and pitfalls to be discussed here } ReleaseSemaphore(FEntriesFree, 1, nil); ReleaseSemaphore(FEntriesUsed, 1, nil); CloseHandle(FEntriesFree); CloseHandle(FEntriesUsed); { If reader or writer threads are waiting, then they will be waiting on the mutex. We will close the handle and let them time out } CloseHandle(FCriticalMutex); end; end; function TBoundedBuffer.PutItem(NewItem: Pointer): boolean; { Called by producer thread } var NthItem: TBufferEntries; begin result := false; { WAIT(EntriesFree) } if WaitForSingleObject(FEntriesFree, INFINITE) <> WAIT_OBJECT_0 then exit; if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0) or not FBufInit then exit; NthItem := FBuf; Inc(NthItem, FWritePtr); NthItem^ := NewItem; FWritePtr := (FWritePtr + 1) mod FBufSize; ReleaseMutex(FCriticalMutex); { SIGNAL(EntriesUsed) } ReleaseSemaphore(FEntriesUsed, 1, nil); result := true; end; function TBoundedBuffer.GetItem: Pointer; { Called by consumer thread } var NthItem: TBufferEntries; begin result := nil; { WAIT(EntriesUsed) } if WaitForSingleObject(FEntriesUsed, INFINITE) <> WAIT_OBJECT_0 then exit; if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0) or not FBufInit then exit; NthItem := FBuf; Inc(NthItem, FReadPtr); Result := NthItem^; FReadPtr := (FReadPtr + 1) mod FBufSize; ReleaseMutex(FCriticalMutex); { SIGNAL(EntriesFree) } ReleaseSemaphore(FEntriesFree, 1, nil); end; destructor TBoundedBuffer.Destroy; begin ResetState; inherited Destroy; end; end.