CircAqNumpyArray.py#

#############################################################
 # 
 # FILE:        CircAqNumPyArr.py
 # DATE:        04/28/2022
 # COMPANY:     BitFlow, Inc.
 # DESCRIPTION: Example illustrating the capture of image data
 #              into a Numpy array for further processing in
 #              Python. The image is being displayed using an 
 #              OpenCV imshow window.
 #
#############################################################

import platform
import sys

if(platform.system() == 'Windows'):
    import msvcrt

    if (sys.version_info.major >= 3 and sys.version_info.minor >= 8):
        import os
        #Following lines specifying, the location of DLLs, are required for Python Versions 3.8 and greater
        os.add_dll_directory("C:\BitFlow SDK 6.5\Bin64")
        os.add_dll_directory("C:\Program Files\CameraLink\Serial")

import BFModule.BufferAcquisition as Buf

import numpy, cv2
import threading
import time

from datetime import datetime

def AcqThread(ThreadName, CirAq, BufArray):
    global keepUpdating

    while(keepUpdating):
        try:
            CurBuf = CirAq.WaitForFrame(1000)
            print(CurBuf.BufferNumber, '/ 100')
            #converting to 16 bits, as OpenCV imshow does not support uint32 images
            img = BufArray[CurBuf.BufferNumber].astype(numpy.uint16)
            cv2.imshow("Image Window", img)
            cv2.waitKey(1)
        except Exception as e:
            #print(e)
            pass

# The main function
def main():
    global keepUpdating

    print('Circular Acquisition Example')
    print('---------------------------------')

    #CirAq = Buf.clsCircularAcquisition(Buf.ErrorMode.ErIgnore)
    CirAq = Buf.clsCircularAcquisition()

    #Call Open board function by showing the Board select dialog
    CirAq.Open()

    #Get board info parameters to create and open a display surface
    imageBitsPerPix = CirAq.GetBoardInfo(Buf.InquireParams.BitsPerPix)



    #Get the number of buffers to allocate
    while 1:
        try:
            numBuffers = int(input('\nEnter number of buffers to allocate: '))
        except ValueError:
            print('Invalid entry. Try again.')
        else:
            break

     #Allocate the requested number of buffers
    BufArray = CirAq.BufferSetup(numBuffers)

   #Setup acquisition using the default options
    #CirAq.AqSetup(Buf.SetupOptions.setupDefault)
    CirAq.AqSetupROI(0,0,512,1024,Buf.SetupOptions.setupDefault)

    t1 = threading.Thread(target=AcqThread, args=('AcqThread', CirAq, BufArray))

    keepUpdating = True
    t1.start()


    print("\nPress \'G\' to start Acquisition")
    print("Press \'S\' to Stop Acquisition")
    if(CirAq.GetTriggerMode() != Buf.TriggerModes.FreeRun):
        print("Press \'T\' to issue a SW Trigger")
    print("Press \'R\' to read a register value")
    print("Press \'W\' to write to a register")
    print("Press \'A\' to get Acquisition Status")
    print("\nPress \'X\' to exit test\n")


    while keepUpdating:
        ch = input() #input('\nEnter command: ')
        if(ch.upper()=='X'):
            if(CirAq.GetAcqStatus().Start):
                CirAq.AqControl(Buf.AcqCommands.Stop, Buf.AcqControlOptions.Wait)
            print('Exit Program.')
            keepUpdating = False
            break
        elif (ch.upper() == 'G'):
            print('Aquisition Started.')
            #Start acquisition
            CirAq.AqControl(Buf.AcqCommands.Start, Buf.AcqControlOptions.Async)
        elif(ch.upper() == 'S'):
            #Stop acquisition
            CirAq.AqControl(Buf.AcqCommands.Stop, Buf.AcqControlOptions.Async)
            print('Aquisition Stopped.')
            CirAq.GetCaptureStatus()
        elif(ch.upper() == 'T'):
            #Issue trigger
            CirAq.IssueSoftwareTrigger()
            print('Trigger Sent.')                
        elif(ch.upper() == 'R'):
            print('Reading register value . . .')
            regName = input('Enter Register Name:')
            try:
                regID = CirAq.RegId(regName)
            except Exception as e:
                print(e)
            else:
                print('Reg ', regName, ' value = ', CirAq.RegPeek(regID))                        
        elif(ch.upper() == 'W'):
            print('Writing to a register . . .')
            regName = input('Enter Register Name:')
            try:
                regID = CirAq.RegId(regName)
            except Exception as e:
                print(e)
            else:
                regVal = int(input("Enter value to write: "))
                CirAq.RegPoke(regID, regVal)
        elif(ch.upper() == 'A'):
            print('Acquisition Status:')
            AqStat = CirAq.GetAcqStatus()
            print('  -Start:\t', AqStat.Start)
            print('  -Stop:\t', AqStat.Stop)
            print('  -Abort:\t', AqStat.Abort)
            print('  -Pause:\t', AqStat.Pause)
            print('  -Cleanup:\t', AqStat.Cleanup)
            print(CirAq.GetCaptureStatus())
  
    keepUpdating = False

    t1.join()

    # destroy the OpenCV image window
    cv2.destroyAllWindows()

    #Clean up acquisition
    CirAq.AqCleanup()

    #Free allocated resources
    CirAq.BufferCleanup()

    #Close the board
    CirAq.Close()


if __name__ == "__main__":
    
   main()