CIeventMonitor.c

/*****************************************************************************

	CIeventMonitor.c			Source for BitFlow CI CIeventMonitor program

	Mar 28,		2022	BitFlow, Inc./JTG

	© Copyright 2022, BitFlow, Inc. All rights reserved.

	Tabstops are 4

*****************************************************************************/

/*==========================================================================*/
/*
**	For access to command line display.
*/
#include	<stdio.h>
#include	<stdarg.h>
#include	<string.h>
/*
**	For checking for keypress
*/
#include	<sys/time.h>
#include	<sys/types.h>
#include	<unistd.h>
/*
**	For event wait threads.
*/
#include	<pthread.h>
#include	<errno.h>
/*
**	For access to BitFlow camera interface library.
*/
#include	"BFciLib.h"
/*==========================================================================*/
/*
**	Atomic getch fetch and add.
**
**	Perform an atomic fetch-and-add on the given variable, for the given
**	increment. Evaluates to the value before the increment.
*/
#define ATOMIC_ADD(ATM, INC) (__sync_fetch_and_add(&(ATM), (INC)))
/*--------------------------------------------------------------------------*/

static int sExitAns = 0;		/* program exit code */
static tCIp sCIp = NULL;		/* device open token */
static int sNdx = 0;			/* device index */
static tCIU32 sCXPlink = 0xFF;	/* cxp link to monitor (default master) */
static tCIU8 sDoEvts = 1;		/* do monitor event packets */
static tCIU8 sDoHrtBt = 1;		/* do monitor heartbeat packets */
static tCIU32 sMaxPkts = 0;		/* total packets to display */
static volatile tCIU32 sRcvdPkts = 0;	/* total count of signals received (atomic) */
static tCIU32 sSkipPkts = 0;	/* packets to skip between display */
static tCIU32 sEvtPkts = 0;		/* count of event packets handled */
static tCIU32 sHrtBtPkts = 0;	/* count of heartbeat packets handled */
static tCIU32 sTimeoutMs = 0;	/* timeout after N ms of no events */
static tCIU8 sCondensed = 0;	/* use a condensed format to print events */
/*--------------------------------------------------------------------------*/
#define	SHOW(x)	{ (void)printf x ; (void)fflush(stdout); }
#define	ERR(x)	{ SHOW(("ERR: ")); SHOW(x); }

static char *sArgv0 = NULL;		/* name of executable */
static void ShowHelp(void)
{
	SHOW(("%s of " __DATE__ " at " __TIME__ "\n", sArgv0));
	SHOW(("   -h           display this message and exit\n"));
	SHOW(("\n"));
	SHOW(("   -x ndx       choose available device ndx (default 0)\n"));
	SHOW(("   -l lnk       cxp link to monitor for events (default master)\n"));
	SHOW(("   -E           only monitor event packets\n"));
	SHOW(("   -H           only monitor heartbeat packets\n"));
	SHOW(("   -m maxPkts   max packets to display (default infinite)\n"));
	SHOW(("   -s skipPkts  packets to skip between display (default 0)\n"));
	SHOW(("   -t mSecs     timeout after mSecs of no events received (default infinite)\n"));
	SHOW(("   -c           use condensed message format\n"));
	SHOW(("\n"));
	SHOW(("  : initialize an interface and display events and heartbeats\n"));
	SHOW(("      display ends with newline\n"));
	SHOW(("\n"));
}
/*==========================================================================*/
#include	<time.h>
#include	<sys/timeb.h>
static tCIDOUBLE GetTime(void)
/*
**	Return fractional seconds
*/
{
	tCIDOUBLE ans = 0.0;

#ifdef _POSIX_TIMERS

	struct timespec tp;

	(void)clock_gettime(CLOCK_MONOTONIC_RAW, &tp);
	ans = (tCIDOUBLE) tp.tv_sec;
	ans += ((tCIDOUBLE) tp.tv_nsec) / 1000000000.0;

#else

	struct timeb tb;

	(void)ftime(&tb);
	ans = tb.millitm;
	ans /= 1000.0;
	ans += tb.time;

#endif

	return (ans);
}

/*--------------------------------------------------------------------------*/
static int DecodeArgs(int argc, char **argv)
/*
**	Parse the input arguments.
*/
{
	char *str;

	argv += 1;
	argc -= 1;					/* skip program name */

	while (argc-- > 0)
	{
		str = *argv++;
		if (str[0] != '-')
		{
			ERR(("Do not know '%s' arg\n", str));
			ShowHelp();
			sExitAns = 1;
			return (sExitAns);
		}
		switch (str[1])
		{
		case 'h':
			ShowHelp();
			return (1);
		case 'x':
			(void)sscanf(*argv, "%d", &sNdx);
			argv += 1;
			argc -= 1;
			break;
		case 'l':
			(void)sscanf(*argv, "%d", &sCXPlink);
			argv += 1;
			argc -= 1;
			break;
		case 'E':
			sDoEvts = 1;
			sDoHrtBt = 0;
			break;
		case 'H':
			sDoEvts = 0;
			sDoHrtBt = 1;
			break;
		case 'm':
			(void)sscanf(*argv, "%d", &sMaxPkts);
			argv += 1;
			argc -= 1;
			break;
		case 's':
			(void)sscanf(*argv, "%d", &sSkipPkts);
			argv += 1;
			argc -= 1;
			break;
		case 't':
			(void)sscanf(*argv, "%d", &sTimeoutMs);
			argv += 1;
			argc -= 1;
			break;
		case 'c':
			sCondensed = 1;
			break;
		default:
			ERR(("Do not know arg '%s'\n", str));
			ShowHelp();
			sExitAns = 1;
			return (sExitAns);
		}
	}

	return (kCIEnoErr);
}

/*--------------------------------------------------------------------------*/
static int CheckForKeyboardInput(void)
/*
**	Return 0 if no input available from stdin, 1 else
**
**	Note: the console needs a newline in order to post input.
*/
{
	fd_set exceptfds, readfds, writefds;
	struct timeval tv;
	int ans;
	char buff[1024];

	FD_ZERO(&exceptfds);
	FD_ZERO(&readfds);
	FD_ZERO(&writefds);
	FD_SET(fileno(stdin), &readfds);
	(void)memset(&tv, '\0', sizeof(struct timeval));

	ans = select(1, &readfds, &writefds, &exceptfds, &tv);

	if ((ans == 1) && FD_ISSET(fileno(stdin), &readfds))
	{
		/*
		 **   Consume the line.
		 */
		(void)fgets(buff, 1024, stdin);
		return (1);
	}

	return (0);
}

/*==========================================================================*/
void* EventThread(void *d)
/*
**	Wait for event packets continuously, or until count exceeds max.
*/
{
	tCISHP hSig = (tCISHP)d;
	tCIRC circ = kCIEnoErr;
	tCIsignalEventData evtData;
	tCiCXPeventPacket cxpEvtPkt;
	tCiCXPeventMessage cxpEvtMsg;
	tCIU32 i, j, byteCnt;
	tCIU8 *bPtr;
	tCIDOUBLE entryTime, exitTime;
	tCIU8 dispPkt;
	tCIU32 cnt;
	
	/*
	 ** Record the entry time.
	 */
	entryTime = GetTime();
	
	/*
	 ** Loop continuously until we reach an exit condition.
	 */
	while (1)
	{
		if (CheckForKeyboardInput())
		{
			SHOW(("Detected keyboard input\n"));
			break;
		}

		/*
		 ** Decide if we want to display this packet.
		 */
		dispPkt = (0 == sSkipPkts) || (0 == (sEvtPkts % (1 + sSkipPkts)));
		
		/*
		 ** Wait indefinitely for the signal to be emitted.
		 */
		circ = CiSignalWait(sCIp, hSig, sizeof(evtData), &evtData, -1);
		if (kCIEcanceledErr == circ)
		{
			SHOW(("Signal canceled.\n"));
			break;
		}
		else if (kCIEnoErr != circ)
		{
			ERR(("CiSignalWait gave '%s'\n", CiErrStr(circ)));
			sExitAns = 1;
			break;
		}
		
		if (dispPkt && !sCondensed)
		{
			SHOW(("CiSignalWait (0x%08X'%08X/%d) OK\n", (tCIU32)(evtData.timestamp >> 32), (tCIU32)(evtData.timestamp & 0xFFFFFFFF), evtData.count));
		}
		
		/*
		 ** Read one event packet.
		 */
		circ = CiCXPreadEventPacket(sCIp, sCXPlink, sizeof(cxpEvtPkt), &cxpEvtPkt);
		if (kCIEnoErr != circ)
		{
			ERR(("CiCXPreadEventPacket gave '%s'\n", CiErrStr(circ)));
			sExitAns = 1;
			break;
		}
		
		if (dispPkt)
		{
			if (!sCondensed)
			{
				// event packet data
				SHOW(("Host connection ID: 0x%08X\n", cxpEvtPkt.masterHostConnectionID));
				SHOW(("Packet Tag: 0x%08X\n", cxpEvtPkt.packetTag));
				SHOW(("Data size: 0x%08X (words)\n", cxpEvtPkt.numWords));
			}
			else
			{
				SHOW(("%08d - Packet Tag: 0x%08X, Data size: 0x%08X, ", sEvtPkts, cxpEvtPkt.packetTag, cxpEvtPkt.numWords));
			}
		}
		
		/*
		 ** Extract the event packet data.
		 */
		i = 0;
		byteCnt = 0;
		do
		{
			circ = CiCXPparseEventPacket(sCIp, i, sizeof(cxpEvtPkt), &cxpEvtPkt, sizeof(cxpEvtMsg), &cxpEvtMsg);
			if (kCIEnoErr != circ)
			{
				ERR(("CiCXPparseEventPacket(%d) gave '%s'\n", i, CiErrStr(circ)));
				sExitAns = 1;
				break;
			}
			else if (cxpEvtMsg.size > 0)
			{
				if (dispPkt && !sCondensed)
				{
					SHOW(("\nMessage number: %d\n", i));
					SHOW(("Message Size: %d (bytes)\n", cxpEvtMsg.size));
					SHOW(("Message Namespace: %d\n", cxpEvtMsg.nameSpace));
					SHOW(("Event ID: %d\n", cxpEvtMsg.eventID));

					SHOW(("Timestamp: 0x%08X'%08X\n", (tCIU32)(cxpEvtMsg.timeStamp >> 32), (tCIU32)(cxpEvtMsg.timeStamp & 0xffffffff)));

					// sanity check on size
					if (cxpEvtMsg.size > 0)
					{
						SHOW(("Data\n"));
						
						/*
						 ** Print each byte of the event packet.
						 */
						j = 0;
						bPtr = (tCIU8*) cxpEvtMsg.data;
						while (cxpEvtMsg.size > 12 + j && kCImaxEventPayload * sizeof(tCIU32) > byteCnt)
						{
							/*
							 ** +12 to exclude 3 word header.
							 */
							SHOW(("%d 0x%02X\n", j, bPtr[j]));
							
							/*
							 ** +1 for the current byte.
							 */
							j++;
							byteCnt++;
						}
					}
				}

				/*
				 ** +12 for 3 word header.
				 */
				byteCnt += 12;
			}
			++i;
		}
		while (cxpEvtMsg.size > 0 && kCImaxEventPayload * sizeof(tCIU32) > byteCnt);

		if (dispPkt && sCondensed)
		{
			SHOW(("Num Mess: %08d\r", i - 1));
		}

		++sEvtPkts;
		cnt = ATOMIC_ADD(sRcvdPkts, 1);

		if (0 != sMaxPkts && (cnt + 1) >= sMaxPkts)
		{
			SHOW(("Max count exceeded.\n"));
			break;
		}
	}

	/*
	 ** Get the exit time, and print a summation message.
	 */
	exitTime = GetTime();

	SHOW(("\nEvent thread leaves after %0.2f sec (%0.2f EPS)\n", (exitTime - entryTime), (tCIDOUBLE)sEvtPkts / (exitTime - entryTime)));

	/*
	 ** Cancel CiSignalExec (if we're the zero-index signal).
	 */
	CiSignalWaitCancel(sCIp, hSig);

	return(NULL);
}

/*--------------------------------------------------------------------------*/
void* HearbeatThread(void *d)
/*
**	Wait for heartbeat packets continuously, or until count exceeds max.
*/
{
	tCISHP hSig = (tCISHP)d;
	tCIRC circ = kCIEnoErr;
	tCIsignalEventData evtData;
	tCiCXPheartbeat cxpHeartbeat;
	tCIU8 dispPkt;
	tCIU32 cnt;

	tCIDOUBLE entryTime, exitTime;

	/*
	 ** Record the entry time.
	 */
	entryTime = GetTime();

	/*
	 ** Loop continuously until we reach an exit condition.
	 */
	while (1)
	{
		if (CheckForKeyboardInput())
		{
			SHOW(("Detected keyboard input\n"));
			break;
		}

		/*
		 ** Decide if we want to display this packet.
		 */
		dispPkt = (0 == sSkipPkts) || (0 == (sHrtBtPkts % (1 + sSkipPkts)));

		/*
		 ** Wait indefinitely for the signal to be emitted.
		 */
		circ = CiSignalWait(sCIp, hSig, sizeof(evtData), &evtData, -1);
		if (kCIEcanceledErr == circ)
		{
			SHOW(("Signal canceled.\n"));
			break;
		}
		else if (kCIEnoErr != circ)
		{
			ERR(("CiSignalWait gave '%s'\n", CiErrStr(circ)));
			sExitAns = 1;
			break;
		}

		/*
		 ** Get the latest heartbeat packet.
		 */
		circ = CiCXPgetLastHeartbeat(sCIp, sCXPlink, sizeof(cxpHeartbeat), &cxpHeartbeat);
		if (kCIEnoErr != circ)
		{
			ERR(("CiCXPgetLastHeartbeat gave '%s'\n", CiErrStr(circ)));
			sExitAns = 1;
			break;
		}

		if (dispPkt)
		{
			tCIU64 delta = cxpHeartbeat.packetDeviceTime - cxpHeartbeat.packetHostTime;
			SHOW(("%08d - ID: 0x%08X PDT: 0x%08X'%08X PHT: 0x%08X'%08X Delta: 0x%08X'%08X\n", sHrtBtPkts, cxpHeartbeat.packetConnectionID,
				(tCIU32)(cxpHeartbeat.packetDeviceTime >> 32), (tCIU32)(cxpHeartbeat.packetDeviceTime & 0xFFFFFFFF),
				(tCIU32)(cxpHeartbeat.packetHostTime >> 32), (tCIU32)(cxpHeartbeat.packetHostTime & 0xFFFFFFFF),
				(tCIU32)(delta >> 32), (tCIU32)(delta & 0xFFFFFFFF)));
		}

		++sHrtBtPkts;
		cnt = ATOMIC_ADD(sRcvdPkts, 1);

		if (0 != sMaxPkts && (cnt + 1) >= sMaxPkts)
		{
			SHOW(("Max count exceeded.\n"));
			break;
		}
	}

	/*
	 ** Get the exit time, and print a summation message.
	 */
	exitTime = GetTime();

	SHOW(("\nHeartbeat thread leaves after %0.2f sec (%0.2f EPS)\n", (exitTime - entryTime), (tCIDOUBLE)sHrtBtPkts / (exitTime - entryTime)));

	/*
	 ** Cancel CiSignalExec (if we're the zero-index signal).
	 */
	CiSignalWaitCancel(sCIp, hSig);

	return(NULL);
}

/*==========================================================================*/
int main (int argc, char *argv[])
/*
**	Main method. Start the threads, then wait for an exit condition.
*/
{
	tCIRC circ;

	tCIU32 pktsLeft, wakeCnt, cnt;

	int sigCnt = 0;
	tCISHP hSigAry[2] = { NULL, NULL };
	pthread_t thrdAry[2] = { 0, 0 };

	/*
	 ** Copy application path for help.
	 */
	sArgv0 = *argv;

	/*
	 ** Decode the command arguments
	 */
	if (DecodeArgs(argc, argv) != kCIEnoErr)
		return(sExitAns);

	/*
	 ** Open the board.
	 */
	circ = CiVFGopen(sNdx, kCIBO_writeAccess, &sCIp);
	if (kCIEnoErr != circ)
	{
		ERR(("CiVFGopen(%d) gave '%s'\n", sNdx, CiErrStr(circ)));
		return(1);
	}

	/*
	 ** Open the event signal, if enabled, and launch it's thread.
	 */
	if (sDoEvts)
	{
		SHOW(("Setup CXPeventRcvd on %d/%d.\n", sNdx, sCXPlink));

		/*
		 ** Setup the signal
		 */
		circ = CiSignalSetup(sCIp, kCIsigCXPeventRcvd, sCXPlink, &hSigAry[sigCnt]);
		if (kCIEnoErr != circ)
		{
			ERR(("CiSignalSetup(%d/CXPeventRcvd/%d) gave '%s'\n", sNdx, sCXPlink, CiErrStr(circ)));
			CiVFGclose(sCIp);
			return(1);
		}

		/*
		 ** Flush the event packet signal queue.
		 */
		CiSignalQueueOp(sCIp, hSigAry[sigCnt], kCISQflush, NULL);

		/*
		 ** Launch the thread.
		 */
		circ = pthread_create(&thrdAry[sigCnt], NULL, &EventThread, hSigAry[sigCnt]);
		if (0 != circ)
		{
			circ = errno;
			ERR(("pthread_create(EventThread) gave (%s)\n", CiErrStr(circ)));
			
			CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
			CiVFGclose(sCIp);
			return(1);
		}

		sigCnt++;
	}
	
	/*
	 ** Open the heartbeat signal, if enabled.
	 */
	if (sDoHrtBt)
	{
		SHOW(("Setup CXPhbRcvd on %d/%d.\n", sNdx, sCXPlink));

		circ = CiSignalSetup(sCIp, kCIsigCXPhbRcvd, sCXPlink, &hSigAry[sigCnt]);
		if (kCIEnoErr != circ)
		{
			ERR(("CiSignalSetup(%d/CXPhbRcvd/%d) gave '%s'\n", sNdx, sCXPlink, CiErrStr(circ)));
			
			while (sigCnt-- > 0)
			{
				CiSignalWaitCancel(sCIp, hSigAry[sigCnt]);
				pthread_join(thrdAry[sigCnt], NULL);
				CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
			}
			
			CiVFGclose(sCIp);
			return(1);
		}

		/*
		 ** Flush the heartbeat signal queue.
		 */
		CiSignalQueueOp(sCIp, hSigAry[sigCnt], kCISQflush, NULL);

		/*
		 ** Launch the thread.
		 */
		circ = pthread_create(&thrdAry[sigCnt], NULL, &HearbeatThread, hSigAry[sigCnt]);
		if (0 != circ)
		{
			circ = errno;
			ERR(("pthread_create(EventThread) gave (%s)\n", CiErrStr(circ)));

			while (sigCnt-- > 0)
			{
				CiSignalWaitCancel(sCIp, hSigAry[sigCnt]);
				pthread_join(thrdAry[sigCnt], NULL);
				CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
			}

			CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
			CiVFGclose(sCIp);
			return(1);
		}

		sigCnt++;
	}

	SHOW(("Executing signal service for (%d). Press key to exit.\n", sMaxPkts));

	/*
	 ** Wait for an error condition to occur.
	 */
	pktsLeft = sMaxPkts;
	while (sExitAns == 0)
	{
		if (CheckForKeyboardInput())
		{
			SHOW(("Detected keyboard input\n"));
			break;
		}

		/*
		 ** See if we have satisfied the max packet count.
		 */
		if (0 != sMaxPkts)
		{
			const tCIU32 pktsRcvd = ATOMIC_ADD(sRcvdPkts, 0);
			if (pktsRcvd >= sMaxPkts)
			{
				SHOW(("Max count exceeded.\n"));
				break;
			}

			pktsLeft = sMaxPkts - pktsRcvd;
		}

		/*
		 ** Service all enabled signals.
		 */
		circ = CiSignalExec(sCIp, sigCnt, hSigAry, pktsLeft, &cnt, sTimeoutMs);
		wakeCnt += cnt;

		if (kCIEcanceledErr == circ)
		{
			SHOW(("CiSignalExec canceled.\n"));
			break;
		}
		else if (kCIEnoErr != circ)
		{
			sExitAns = 1;
			ERR(("CiSignalExec gave (%s)\n", CiErrStr(circ)));
			break;
		}
	}

	/*
	 ** Cleanup everything.
	 */
	while (sigCnt-- > 0)
	{
		CiSignalWaitCancel(sCIp, hSigAry[sigCnt]);
		pthread_join(thrdAry[sigCnt], NULL);
		CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
	}

	CiVFGclose(sCIp);

	return(sExitAns);
}