1: # include "../ingres.h" 2: # include "../pipes.h" 3: 4: /* 5: ** Buffered Read From Pipe 6: ** 7: ** Reads from the pipe with the UNIX file descriptor `des' into 8: ** the buffer `buf'. `Mode' tells what is to be done. `Msg' is 9: ** the address of the input buffer, and `n' is the length of the 10: ** info. If `n' is zero, the data is assumed to be a null- 11: ** terminated string. 12: ** 13: ** `Buf' is defined in "../pipes.h" together with the modes. 14: ** The pipe should be written by wrpipe() to insure that the 15: ** pipe formats match. 16: ** 17: ** Modes: 18: ** 19: ** P_PRIME -- Primes the buffer. This involves cleaning out all 20: ** the header stuff so that a physical read is forced 21: ** on the next normal call to rdpipe(). It also resets 22: ** the error flag, etc. 23: ** 24: ** P_NORM -- This is the normal mode. If the buffer is empty, 25: ** it is filled from the pipe. `N' bytes are moved into 26: ** `msg'. If `n' is zero, bytes are moved into `msg' up 27: ** to and including the first null byte. The pipe is 28: ** read whenever necessary, so the size of the data 29: ** buffer (controlled by PBUFSIZ) has absolutely no 30: ** effect on operation. 31: ** 32: ** P_SYNC -- This mode reads the buffer, filling the pipe as 33: ** necessary, until an End Of Pipe (EOP) is encountered. 34: ** An EOP is defined as an empty buffer with a mode 35: ** ("hdrstat") of "END_STAT". Any data left in the pipe 36: ** is thrown away, but error messages are not. Anything 37: ** not already read by the user is read by P_SYNC, so if 38: ** you are not certain that you have already read the 39: ** previous EOP. 40: ** 41: ** P_EXECID -- The first (exec_id) field of the pipe buffer header 42: ** is read and returned. This is used (for instance) by 43: ** the DBU routines, which must get the exec_id, but must 44: ** not read the rest of the pipe in case they must overlay 45: ** themselves. It must ALWAYS be followed by a rdpipe of 46: ** mode P_FUNCID!! 47: ** 48: ** P_FUNCID -- The second through last bytes of the pipe are read 49: ** into the buffer, and the function code ("func_id") is 50: ** returned. This is the second half of a P_EXECID call. 51: ** 52: ** P_INT -- In the event of an interrupt, a wrpipe() call should 53: ** be made to all writable pipes of mode P_INT, and then 54: ** rdpipe() calls should be made on all readable pipes 55: ** of this mode. This clears out the pipe up until a 56: ** special type of pipe header ("SYNC_STAT") is read. It 57: ** is absolutely critical that this is called in conjunc- 58: ** tion with all other processes. This mode is used in 59: ** resyncpipes(), which performs all the correct calls 60: ** in the correct order. 61: ** 62: ** In all cases except P_INT mode, if an error is read from the 63: ** pipe it is automatically passed to proc_error(). This routine 64: ** is responsible for passing the error message to the next higher 65: ** process. 66: ** 67: ** If an end of file is read from the pipe, the end_job() routine 68: ** is called. This routine should do any necessary end of job 69: ** processing needed (closing relations, etc.) and return with 70: ** a zero value. 71: ** 72: ** Default proc_error() and end_job() routines exist in the 73: ** library, so if you don't want any extra processing, you can 74: ** just not bother to define them. 75: ** 76: ** In general, the number of bytes actually read is returned. If 77: ** you read an EOP, zero is returned. 78: */ 79: 80: extern int (*Exitfn)(); /* defined in syserr */ 81: extern int read(); /* standard read routine */ 82: int (*Pi_rd_fn)() = &read; /* fn to read from pipe */ 83: 84: rdpipe(mode, buf1, des, msg, n) 85: int mode; 86: char *msg; 87: int n; 88: int des; /* UNIX file descriptor for pipe */ 89: struct pipfrmt *buf1; 90: { 91: register int knt; 92: register int syncflg; 93: int i; 94: extern char *Proc_name; 95: register struct pipfrmt *buf; 96: 97: buf = buf1; 98: # ifdef xATR1 99: if (tTf(99, 0)) 100: printf("\n%s ent rdpipe md %d buf %u des %d\n", Proc_name, mode, buf, des); 101: # endif 102: syncflg = 0; 103: switch (mode) 104: { 105: 106: case P_PRIME: 107: buf->pbuf_pt = buf->buf_len = buf->err_id = 0; 108: buf->hdrstat = NORM_STAT; 109: return (0); 110: 111: case P_NORM: 112: break; 113: 114: case P_SYNC: 115: syncflg++; 116: break; 117: 118: case P_EXECID: 119: # ifdef xATR3 120: if (tTf(99, 1)) 121: printf("%s rdpipe ret %c\n", Proc_name, buf->exec_id); 122: # endif 123: syncflg++; 124: break; 125: 126: case P_FUNCID: 127: return (buf->func_id); 128: 129: case P_INT: 130: syncflg--; 131: buf->hdrstat = NORM_STAT; 132: break; 133: 134: case P_PARAM: 135: /* If there is currently some data, return param_id. 136: ** else read next block are return its param_id 137: */ 138: if (buf->pbuf_pt < buf->buf_len || buf->hdrstat == LAST_STAT) 139: return (buf->param_id); 140: break; 141: 142: default: 143: syserr("rdpipe: bad call %d", mode); 144: } 145: knt = 0; 146: 147: do 148: { 149: /* check for buffer empty */ 150: while (buf->pbuf_pt >= buf->buf_len || syncflg) 151: { 152: /* read a new buffer full */ 153: if (buf->hdrstat == LAST_STAT && syncflg >= 0) 154: { 155: goto out; 156: } 157: # ifdef xATR3 158: if (tTf(99, 2)) 159: { 160: printf("%s rdng %d w len %d pt %d err %d sf %d\n", 161: Proc_name, des, buf->buf_len, buf->pbuf_pt, buf->err_id, syncflg); 162: } 163: # endif 164: i = (*Pi_rd_fn)(des, buf, HDRSIZ+PBUFSIZ); 165: if (i == 0) 166: { 167: eoj: 168: # ifdef xATR3 169: if (tTf(99, 1)) 170: printf("%s rdpipe exit\n", Proc_name); 171: # endif 172: if (syncflg < 0) 173: syserr("rdpipe: EOF %d", des); 174: close(W_down); 175: if (des != R_up) 176: { 177: # ifdef xATR1 178: if (tTf(99, -1)) 179: syserr("EOF on buf=%u fd=%d", buf, des); 180: # endif 181: (*Exitfn)(-1); 182: } 183: exit(end_job()); 184: } 185: buf->pbuf_pt = 0; 186: # ifdef xATR2 187: if (tTf(99, 2)) 188: prpipe(buf); 189: # endif 190: if (i < HDRSIZ+PBUFSIZ) 191: syserr("rdpipe: rd err buf %u des %d", buf, des); 192: if (buf->hdrstat == SYNC_STAT) 193: { 194: if (syncflg < 0) 195: return (1); 196: else 197: syserr("rdpipe: unexpected SYNC_STAT"); 198: } 199: if (buf->err_id != 0 && syncflg >= 0) 200: { 201: # ifdef xATR2 202: if (tTf(99, 3)) 203: printf("%s rdpipe err %d\n", Proc_name, buf->err_id); 204: # endif 205: proc_error(buf, des); 206: # ifdef xATR3 207: if (tTf(99, 3)) 208: printf("%s pe ret\n", Proc_name); 209: # endif 210: buf->pbuf_pt = buf->buf_len; 211: buf->hdrstat = NORM_STAT; 212: continue; 213: } 214: if (mode == P_PARAM) 215: return (buf->param_id); 216: if (mode == P_EXECID) 217: return (buf->exec_id); 218: } 219: /* get a byte of information */ 220: msg[knt++] = buf->pbuf[buf->pbuf_pt++]; 221: } while ((n == 0) ? (msg[knt-1]) : (knt < n)); 222: 223: out: 224: # ifdef xATR1 225: if (tTf(99, 1)) 226: { 227: printf("%s rdpipe ret %d", Proc_name, knt); 228: if (n == 0 && syncflg == 0) 229: printf(", str `%s'", msg); 230: printf("\n"); 231: } 232: # endif 233: return(knt); 234: }