|
The steps are:
Retrieve the first message from the queue using getq(9F).
If the message is high priority, process it immediately and
pass it along the stream.
Otherwise, the service procedure should use canputnext(9F)
to determine if the next module or driver that enqueues messages is within
acceptable flow-control limits. canputnext(9F) searches the stream for the next module,
driver, or the stream head with a service procedure. When
it finds one, it looks at the total message space currently allocated to the
queue for messages. If the amount of space currently used at that queue reaches
the high-water mark, canputnext(9F) returns false (zero). If the next queue
with a service procedure is within acceptable flow-control
limits, canputnext(9F)
returns true (nonzero).
If canputnext(9F) returns false, the service procedure returns the message to its own queue with putbq(9F). The service
procedure can do no further processing at this time, and it returns to the
caller.
If canputnext(9F)
returns true, the service procedure completes any processing
of the message. This can involve retrieving more messages from the queue,
allocating and deallocating header and trailer information, and performing
control functions for the module.
When the service procedure is finished
processing the message, it calls putnext(9F) to pass the resulting message to the
next queue.
These steps are repeated until getq(9F) returns NULL (the queue is empty) or canputnext(9F)
returns false.
Filter Module Example
The module shown next, crmod in Example 10-4,
is an asymmetric filter. On the write side, a newline is
changed to a carriage return followed by a newline. On the read side, no conversion is done.
Example 10-4 Filter Module
/* Simple filter
* converts newline -> carriage return, newline
*/
#include <sys/types.h>
#include <sys/param.h>
#include <sys/stream.h>
#include <sys/stropts.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>
static struct module_info minfo =
{ 0x09, "crmod", 0, INFPSZ, 512, 128 };
static int modopen (queue_t*, dev_t*, int, int, cred_t*);
static int modrput (queue_t*, mblk_t*);
static int modwput (queue_t*, mblk_t*);
static int modwsrv (queue_t*);
static int modclose (queue_t*, int, cred_t*);
static struct qinit rinit = {
modrput, NULL, modopen, modclose, NULL, &minfo, NULL};
static struct qinit winit = {
modwput, modwsrv, NULL, NULL, NULL, &minfo, NULL};
struct streamtab crmdinfo={ &rinit, &winit, NULL, NULL};
|
stropts.h includes definitions of flush message options
common to user applications. modrput is like modput from the null module.
In contrast to the null module example, a single module_info structure is shared by the read side and write side. The module_info includes the flow control high-water and low-water marks
(512 and 128) for the write queue. (Though the same module_info
is used on the read queue side, the read side has no service
procedure so flow control is not used.) The qinit contains
the service procedure pointer.
The write-side put procedure, the beginning of the service procedure, and an example of flushing a queue are shown
in Example 10-5.
Example 10-5 Flushing a Queue
static int
modwput(queue_t *q, mblk_t *mp)
{
if (mp->b_datap->db_type >= QPCTL && mp->b_datap->db_type != M_FLUSH)
putnext(q, mp);
else
putq(q, mp); /* Put it on the queue */
return (0);
}
static int
modwsrv(queue_t *q)
{
mblk_t *mp;
while ((mp = getq(q)) != NULL) {
switch (mp->b_datap->db_type) {
default:
if (canputnext(q)) {
putnext(q, mp);
break;
} else {
putbq(q, mp);
return (0);
}
case M_FLUSH:
if (*mp->b_rptr & FLUSHW)
flushq(q, FLUSHDATA);
putnext(q, mp);
break;
case M_DATA: {
mblk_t *nbp = NULL;
mblk_t *next;
if (!canputnext(q)) {
putbq(q, mp);
return (0);
}
/* Filter data, appending to queue */
for (; mp != NULL; mp = next) {
while (mp->b_rptr < mp->b_wptr) {
if (*mp->b_rptr == '\n')
if (!bappend(&nbp, '\r'))
goto push;
if (!bappend(&nbp, *mp->b_rptr))
goto push;
mp->b_rptr++;
continue;
push:
if (nbp)
putnext(q, nbp);
nbp = NULL;
if (!canputnext(q)) {
if (mp->b_rptr>=mp->b_wptr){
next = mp->b_cont;
freeb(mp);
mp=next;
}
if (mp)
putbq(q, mp);
return (0);
}
} /* while */
next = mp->b_cont;
freeb(mp);
if (nbp)
putnext(q, nbp);
} /* for */
}
} /* switch */
}
return (0);
}
|
|