/*****************************************************************************
CIeventMonitor.c Source for BitFlow CI CIeventMonitor program
Mar 28, 2022 BitFlow, Inc./JTG
© Copyright 2022, BitFlow, Inc. All rights reserved.
Tabstops are 4
*****************************************************************************/
/*==========================================================================*/
/*
** For access to command line display.
*/
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
/*
** For checking for keypress
*/
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
/*
** For event wait threads.
*/
#include <pthread.h>
#include <errno.h>
/*
** For access to BitFlow camera interface library.
*/
#include "BFciLib.h"
/*==========================================================================*/
/*
** Atomic getch fetch and add.
**
** Perform an atomic fetch-and-add on the given variable, for the given
** increment. Evaluates to the value before the increment.
*/
#define ATOMIC_ADD(ATM, INC) (__sync_fetch_and_add(&(ATM), (INC)))
/*--------------------------------------------------------------------------*/
static int sExitAns = 0; /* program exit code */
static tCIp sCIp = NULL; /* device open token */
static int sNdx = 0; /* device index */
static tCIU32 sCXPlink = 0xFF; /* cxp link to monitor (default master) */
static tCIU8 sDoEvts = 1; /* do monitor event packets */
static tCIU8 sDoHrtBt = 1; /* do monitor heartbeat packets */
static tCIU32 sMaxPkts = 0; /* total packets to display */
static volatile tCIU32 sRcvdPkts = 0; /* total count of signals received (atomic) */
static tCIU32 sSkipPkts = 0; /* packets to skip between display */
static tCIU32 sEvtPkts = 0; /* count of event packets handled */
static tCIU32 sHrtBtPkts = 0; /* count of heartbeat packets handled */
static tCIU32 sTimeoutMs = 0; /* timeout after N ms of no events */
static tCIU8 sCondensed = 0; /* use a condensed format to print events */
/*--------------------------------------------------------------------------*/
#define SHOW(x) { (void)printf x ; (void)fflush(stdout); }
#define ERR(x) { SHOW(("ERR: ")); SHOW(x); }
static char *sArgv0 = NULL; /* name of executable */
static void ShowHelp(void)
{
SHOW(("%s of " __DATE__ " at " __TIME__ "\n", sArgv0));
SHOW((" -h display this message and exit\n"));
SHOW(("\n"));
SHOW((" -x ndx choose available device ndx (default 0)\n"));
SHOW((" -l lnk cxp link to monitor for events (default master)\n"));
SHOW((" -E only monitor event packets\n"));
SHOW((" -H only monitor heartbeat packets\n"));
SHOW((" -m maxPkts max packets to display (default infinite)\n"));
SHOW((" -s skipPkts packets to skip between display (default 0)\n"));
SHOW((" -t mSecs timeout after mSecs of no events received (default infinite)\n"));
SHOW((" -c use condensed message format\n"));
SHOW(("\n"));
SHOW((" : initialize an interface and display events and heartbeats\n"));
SHOW((" display ends with newline\n"));
SHOW(("\n"));
}
/*==========================================================================*/
#include <time.h>
#include <sys/timeb.h>
static tCIDOUBLE GetTime(void)
/*
** Return fractional seconds
*/
{
tCIDOUBLE ans = 0.0;
#ifdef _POSIX_TIMERS
struct timespec tp;
(void)clock_gettime(CLOCK_MONOTONIC_RAW, &tp);
ans = (tCIDOUBLE) tp.tv_sec;
ans += ((tCIDOUBLE) tp.tv_nsec) / 1000000000.0;
#else
struct timeb tb;
(void)ftime(&tb);
ans = tb.millitm;
ans /= 1000.0;
ans += tb.time;
#endif
return (ans);
}
/*--------------------------------------------------------------------------*/
static int DecodeArgs(int argc, char **argv)
/*
** Parse the input arguments.
*/
{
char *str;
argv += 1;
argc -= 1; /* skip program name */
while (argc-- > 0)
{
str = *argv++;
if (str[0] != '-')
{
ERR(("Do not know '%s' arg\n", str));
ShowHelp();
sExitAns = 1;
return (sExitAns);
}
switch (str[1])
{
case 'h':
ShowHelp();
return (1);
case 'x':
(void)sscanf(*argv, "%d", &sNdx);
argv += 1;
argc -= 1;
break;
case 'l':
(void)sscanf(*argv, "%d", &sCXPlink);
argv += 1;
argc -= 1;
break;
case 'E':
sDoEvts = 1;
sDoHrtBt = 0;
break;
case 'H':
sDoEvts = 0;
sDoHrtBt = 1;
break;
case 'm':
(void)sscanf(*argv, "%d", &sMaxPkts);
argv += 1;
argc -= 1;
break;
case 's':
(void)sscanf(*argv, "%d", &sSkipPkts);
argv += 1;
argc -= 1;
break;
case 't':
(void)sscanf(*argv, "%d", &sTimeoutMs);
argv += 1;
argc -= 1;
break;
case 'c':
sCondensed = 1;
break;
default:
ERR(("Do not know arg '%s'\n", str));
ShowHelp();
sExitAns = 1;
return (sExitAns);
}
}
return (kCIEnoErr);
}
/*--------------------------------------------------------------------------*/
static int CheckForKeyboardInput(void)
/*
** Return 0 if no input available from stdin, 1 else
**
** Note: the console needs a newline in order to post input.
*/
{
fd_set exceptfds, readfds, writefds;
struct timeval tv;
int ans;
char buff[1024];
FD_ZERO(&exceptfds);
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_SET(fileno(stdin), &readfds);
(void)memset(&tv, '\0', sizeof(struct timeval));
ans = select(1, &readfds, &writefds, &exceptfds, &tv);
if ((ans == 1) && FD_ISSET(fileno(stdin), &readfds))
{
/*
** Consume the line.
*/
(void)fgets(buff, 1024, stdin);
return (1);
}
return (0);
}
/*==========================================================================*/
void* EventThread(void *d)
/*
** Wait for event packets continuously, or until count exceeds max.
*/
{
tCISHP hSig = (tCISHP)d;
tCIRC circ = kCIEnoErr;
tCIsignalEventData evtData;
tCiCXPeventPacket cxpEvtPkt;
tCiCXPeventMessage cxpEvtMsg;
tCIU32 i, j, byteCnt;
tCIU8 *bPtr;
tCIDOUBLE entryTime, exitTime;
tCIU8 dispPkt;
tCIU32 cnt;
/*
** Record the entry time.
*/
entryTime = GetTime();
/*
** Loop continuously until we reach an exit condition.
*/
while (1)
{
if (CheckForKeyboardInput())
{
SHOW(("Detected keyboard input\n"));
break;
}
/*
** Decide if we want to display this packet.
*/
dispPkt = (0 == sSkipPkts) || (0 == (sEvtPkts % (1 + sSkipPkts)));
/*
** Wait indefinitely for the signal to be emitted.
*/
circ = CiSignalWait(sCIp, hSig, sizeof(evtData), &evtData, -1);
if (kCIEcanceledErr == circ)
{
SHOW(("Signal canceled.\n"));
break;
}
else if (kCIEnoErr != circ)
{
ERR(("CiSignalWait gave '%s'\n", CiErrStr(circ)));
sExitAns = 1;
break;
}
if (dispPkt && !sCondensed)
{
SHOW(("CiSignalWait (0x%08X'%08X/%d) OK\n", (tCIU32)(evtData.timestamp >> 32), (tCIU32)(evtData.timestamp & 0xFFFFFFFF), evtData.count));
}
/*
** Read one event packet.
*/
circ = CiCXPreadEventPacket(sCIp, sCXPlink, sizeof(cxpEvtPkt), &cxpEvtPkt);
if (kCIEnoErr != circ)
{
ERR(("CiCXPreadEventPacket gave '%s'\n", CiErrStr(circ)));
sExitAns = 1;
break;
}
if (dispPkt)
{
if (!sCondensed)
{
// event packet data
SHOW(("Host connection ID: 0x%08X\n", cxpEvtPkt.masterHostConnectionID));
SHOW(("Packet Tag: 0x%08X\n", cxpEvtPkt.packetTag));
SHOW(("Data size: 0x%08X (words)\n", cxpEvtPkt.numWords));
}
else
{
SHOW(("%08d - Packet Tag: 0x%08X, Data size: 0x%08X, ", sEvtPkts, cxpEvtPkt.packetTag, cxpEvtPkt.numWords));
}
}
/*
** Extract the event packet data.
*/
i = 0;
byteCnt = 0;
do
{
circ = CiCXPparseEventPacket(sCIp, i, sizeof(cxpEvtPkt), &cxpEvtPkt, sizeof(cxpEvtMsg), &cxpEvtMsg);
if (kCIEnoErr != circ)
{
ERR(("CiCXPparseEventPacket(%d) gave '%s'\n", i, CiErrStr(circ)));
sExitAns = 1;
break;
}
else if (cxpEvtMsg.size > 0)
{
if (dispPkt && !sCondensed)
{
SHOW(("\nMessage number: %d\n", i));
SHOW(("Message Size: %d (bytes)\n", cxpEvtMsg.size));
SHOW(("Message Namespace: %d\n", cxpEvtMsg.nameSpace));
SHOW(("Event ID: %d\n", cxpEvtMsg.eventID));
SHOW(("Timestamp: 0x%08X'%08X\n", (tCIU32)(cxpEvtMsg.timeStamp >> 32), (tCIU32)(cxpEvtMsg.timeStamp & 0xffffffff)));
// sanity check on size
if (cxpEvtMsg.size > 0)
{
SHOW(("Data\n"));
/*
** Print each byte of the event packet.
*/
j = 0;
bPtr = (tCIU8*) cxpEvtMsg.data;
while (cxpEvtMsg.size > 12 + j && kCImaxEventPayload * sizeof(tCIU32) > byteCnt)
{
/*
** +12 to exclude 3 word header.
*/
SHOW(("%d 0x%02X\n", j, bPtr[j]));
/*
** +1 for the current byte.
*/
j++;
byteCnt++;
}
}
}
/*
** +12 for 3 word header.
*/
byteCnt += 12;
}
++i;
}
while (cxpEvtMsg.size > 0 && kCImaxEventPayload * sizeof(tCIU32) > byteCnt);
if (dispPkt && sCondensed)
{
SHOW(("Num Mess: %08d\r", i - 1));
}
++sEvtPkts;
cnt = ATOMIC_ADD(sRcvdPkts, 1);
if (0 != sMaxPkts && (cnt + 1) >= sMaxPkts)
{
SHOW(("Max count exceeded.\n"));
break;
}
}
/*
** Get the exit time, and print a summation message.
*/
exitTime = GetTime();
SHOW(("\nEvent thread leaves after %0.2f sec (%0.2f EPS)\n", (exitTime - entryTime), (tCIDOUBLE)sEvtPkts / (exitTime - entryTime)));
/*
** Cancel CiSignalExec (if we're the zero-index signal).
*/
CiSignalWaitCancel(sCIp, hSig);
return(NULL);
}
/*--------------------------------------------------------------------------*/
void* HearbeatThread(void *d)
/*
** Wait for heartbeat packets continuously, or until count exceeds max.
*/
{
tCISHP hSig = (tCISHP)d;
tCIRC circ = kCIEnoErr;
tCIsignalEventData evtData;
tCiCXPheartbeat cxpHeartbeat;
tCIU8 dispPkt;
tCIU32 cnt;
tCIDOUBLE entryTime, exitTime;
/*
** Record the entry time.
*/
entryTime = GetTime();
/*
** Loop continuously until we reach an exit condition.
*/
while (1)
{
if (CheckForKeyboardInput())
{
SHOW(("Detected keyboard input\n"));
break;
}
/*
** Decide if we want to display this packet.
*/
dispPkt = (0 == sSkipPkts) || (0 == (sHrtBtPkts % (1 + sSkipPkts)));
/*
** Wait indefinitely for the signal to be emitted.
*/
circ = CiSignalWait(sCIp, hSig, sizeof(evtData), &evtData, -1);
if (kCIEcanceledErr == circ)
{
SHOW(("Signal canceled.\n"));
break;
}
else if (kCIEnoErr != circ)
{
ERR(("CiSignalWait gave '%s'\n", CiErrStr(circ)));
sExitAns = 1;
break;
}
/*
** Get the latest heartbeat packet.
*/
circ = CiCXPgetLastHeartbeat(sCIp, sCXPlink, sizeof(cxpHeartbeat), &cxpHeartbeat);
if (kCIEnoErr != circ)
{
ERR(("CiCXPgetLastHeartbeat gave '%s'\n", CiErrStr(circ)));
sExitAns = 1;
break;
}
if (dispPkt)
{
tCIU64 delta = cxpHeartbeat.packetDeviceTime - cxpHeartbeat.packetHostTime;
SHOW(("%08d - ID: 0x%08X PDT: 0x%08X'%08X PHT: 0x%08X'%08X Delta: 0x%08X'%08X\n", sHrtBtPkts, cxpHeartbeat.packetConnectionID,
(tCIU32)(cxpHeartbeat.packetDeviceTime >> 32), (tCIU32)(cxpHeartbeat.packetDeviceTime & 0xFFFFFFFF),
(tCIU32)(cxpHeartbeat.packetHostTime >> 32), (tCIU32)(cxpHeartbeat.packetHostTime & 0xFFFFFFFF),
(tCIU32)(delta >> 32), (tCIU32)(delta & 0xFFFFFFFF)));
}
++sHrtBtPkts;
cnt = ATOMIC_ADD(sRcvdPkts, 1);
if (0 != sMaxPkts && (cnt + 1) >= sMaxPkts)
{
SHOW(("Max count exceeded.\n"));
break;
}
}
/*
** Get the exit time, and print a summation message.
*/
exitTime = GetTime();
SHOW(("\nHeartbeat thread leaves after %0.2f sec (%0.2f EPS)\n", (exitTime - entryTime), (tCIDOUBLE)sHrtBtPkts / (exitTime - entryTime)));
/*
** Cancel CiSignalExec (if we're the zero-index signal).
*/
CiSignalWaitCancel(sCIp, hSig);
return(NULL);
}
/*==========================================================================*/
int main (int argc, char *argv[])
/*
** Main method. Start the threads, then wait for an exit condition.
*/
{
tCIRC circ;
tCIU32 pktsLeft, wakeCnt, cnt;
int sigCnt = 0;
tCISHP hSigAry[2] = { NULL, NULL };
pthread_t thrdAry[2] = { 0, 0 };
/*
** Copy application path for help.
*/
sArgv0 = *argv;
/*
** Decode the command arguments
*/
if (DecodeArgs(argc, argv) != kCIEnoErr)
return(sExitAns);
/*
** Open the board.
*/
circ = CiVFGopen(sNdx, kCIBO_writeAccess, &sCIp);
if (kCIEnoErr != circ)
{
ERR(("CiVFGopen(%d) gave '%s'\n", sNdx, CiErrStr(circ)));
return(1);
}
/*
** Open the event signal, if enabled, and launch it's thread.
*/
if (sDoEvts)
{
SHOW(("Setup CXPeventRcvd on %d/%d.\n", sNdx, sCXPlink));
/*
** Setup the signal
*/
circ = CiSignalSetup(sCIp, kCIsigCXPeventRcvd, sCXPlink, &hSigAry[sigCnt]);
if (kCIEnoErr != circ)
{
ERR(("CiSignalSetup(%d/CXPeventRcvd/%d) gave '%s'\n", sNdx, sCXPlink, CiErrStr(circ)));
CiVFGclose(sCIp);
return(1);
}
/*
** Flush the event packet signal queue.
*/
CiSignalQueueOp(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
/*
** Launch the thread.
*/
circ = pthread_create(&thrdAry[sigCnt], NULL, &EventThread, hSigAry[sigCnt]);
if (0 != circ)
{
circ = errno;
ERR(("pthread_create(EventThread) gave (%s)\n", CiErrStr(circ)));
CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
CiVFGclose(sCIp);
return(1);
}
sigCnt++;
}
/*
** Open the heartbeat signal, if enabled.
*/
if (sDoHrtBt)
{
SHOW(("Setup CXPhbRcvd on %d/%d.\n", sNdx, sCXPlink));
circ = CiSignalSetup(sCIp, kCIsigCXPhbRcvd, sCXPlink, &hSigAry[sigCnt]);
if (kCIEnoErr != circ)
{
ERR(("CiSignalSetup(%d/CXPhbRcvd/%d) gave '%s'\n", sNdx, sCXPlink, CiErrStr(circ)));
while (sigCnt-- > 0)
{
CiSignalWaitCancel(sCIp, hSigAry[sigCnt]);
pthread_join(thrdAry[sigCnt], NULL);
CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
}
CiVFGclose(sCIp);
return(1);
}
/*
** Flush the heartbeat signal queue.
*/
CiSignalQueueOp(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
/*
** Launch the thread.
*/
circ = pthread_create(&thrdAry[sigCnt], NULL, &HearbeatThread, hSigAry[sigCnt]);
if (0 != circ)
{
circ = errno;
ERR(("pthread_create(EventThread) gave (%s)\n", CiErrStr(circ)));
while (sigCnt-- > 0)
{
CiSignalWaitCancel(sCIp, hSigAry[sigCnt]);
pthread_join(thrdAry[sigCnt], NULL);
CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
}
CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
CiVFGclose(sCIp);
return(1);
}
sigCnt++;
}
SHOW(("Executing signal service for (%d). Press key to exit.\n", sMaxPkts));
/*
** Wait for an error condition to occur.
*/
pktsLeft = sMaxPkts;
while (sExitAns == 0)
{
if (CheckForKeyboardInput())
{
SHOW(("Detected keyboard input\n"));
break;
}
/*
** See if we have satisfied the max packet count.
*/
if (0 != sMaxPkts)
{
const tCIU32 pktsRcvd = ATOMIC_ADD(sRcvdPkts, 0);
if (pktsRcvd >= sMaxPkts)
{
SHOW(("Max count exceeded.\n"));
break;
}
pktsLeft = sMaxPkts - pktsRcvd;
}
/*
** Service all enabled signals.
*/
circ = CiSignalExec(sCIp, sigCnt, hSigAry, pktsLeft, &cnt, sTimeoutMs);
wakeCnt += cnt;
if (kCIEcanceledErr == circ)
{
SHOW(("CiSignalExec canceled.\n"));
break;
}
else if (kCIEnoErr != circ)
{
sExitAns = 1;
ERR(("CiSignalExec gave (%s)\n", CiErrStr(circ)));
break;
}
}
/*
** Cleanup everything.
*/
while (sigCnt-- > 0)
{
CiSignalWaitCancel(sCIp, hSigAry[sigCnt]);
pthread_join(thrdAry[sigCnt], NULL);
CiSignalCleanup(sCIp, hSigAry[sigCnt], kCISQflush, NULL);
}
CiVFGclose(sCIp);
return(sExitAns);
}