Index: src/backend/libpq/bufferedSock.c =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/backend/libpq/bufferedSock.c,v retrieving revision 1.6 diff -c -r1.6 bufferedSock.c *** src/backend/libpq/bufferedSock.c 2002/06/09 10:06:28 1.6 --- src/backend/libpq/bufferedSock.c 2002/06/09 13:41:34 *************** *** 7,18 **** */ - #include - #if defined(HAVE_STRING_H) - #include - #else - #include - #endif #include #include #include --- 7,12 ---- *************** *** 38,56 **** #endif /* linux */ #include "postgres.h" #include "libpq/pqsignal.h" #include "replication/replication.h" - /* - #include "miscadmin.h" - #include "libpq/auth.h" - #include "libpq/libpq.h" - #include "libpq/pqcomm.h" - //#include "storage/ipc.h" - #ifdef MULTIBYTE - #include "mb/pg_wchar.h" - #endif - #include "utils/elog.h" - */ //#define EBUF_DEBUG /* --------------------------------------------------------------------- */ --- 32,40 ---- #endif /* linux */ #include "postgres.h" + #include "miscadmin.h" #include "libpq/pqsignal.h" #include "replication/replication.h" //#define EBUF_DEBUG /* --------------------------------------------------------------------- */ *************** *** 647,653 **** /* sockServerPort: sets up server port */ int ! sockServerPort(char *hostName, short portName, int *fdP, struct sockaddr *serverAddr) { SockAddr saddr; --- 631,637 ---- /* sockServerPort: sets up server port */ int ! sockServerPort(const char *hostName, short portName, int *fdP, struct sockaddr *serverAddr) { SockAddr saddr; *************** *** 656,662 **** family; size_t len; int one = 1; - char *UnixSocketDir = NULL; #ifdef HAVE_FCNTL_SETLK int lock_fd; #endif --- 640,645 ---- *************** *** 697,709 **** lck.l_whence = SEEK_SET; lck.l_start = lck.l_len = 0; lck.l_type = F_WRLCK; ! if (fcntl(lock_fd, F_SETLK, &lck) == 0) { ! TPRINTF(TRACE_VERBOSE, "flock on %s, deleting", sock_path); unlink(sock_path); } - else - TPRINTF(TRACE_VERBOSE, "flock failed for %s", sock_path); close(lock_fd); } #endif /* HAVE_FCNTL_SETLK */ --- 680,692 ---- lck.l_whence = SEEK_SET; lck.l_start = lck.l_len = 0; lck.l_type = F_WRLCK; ! if (fcntl(lock_fd, F_SETLK, &lck) < 0) ! elog(NOTICE, "flock failed for %s: %m", sock_path); ! else { ! elog(DEBUG, "flock on %s, deleting", sock_path); unlink(sock_path); } close(lock_fd); } #endif /* HAVE_FCNTL_SETLK */ *************** *** 717,723 **** err = bind(fd, &saddr.sa, len); if (err < 0) { ! elog(DEBUG, "bufferedSock: ServerPort: bind failed"); return STATUS_ERROR; } --- 700,706 ---- err = bind(fd, &saddr.sa, len); if (err < 0) { ! elog(DEBUG, "bufferedSock: ServerPort: bind failed: %m"); return STATUS_ERROR; } *************** *** 737,743 **** lck.l_whence = SEEK_SET; lck.l_start = lck.l_len = 0; lck.l_type = F_WRLCK; if (fcntl(lock_fd, F_SETLK, &lck) != 0) ! TPRINTF(TRACE_VERBOSE, "flock error for %s", sock_path); } #endif /* HAVE_FCNTL_SETLK */ } --- 720,726 ---- lck.l_whence = SEEK_SET; lck.l_start = lck.l_len = 0; lck.l_type = F_WRLCK; if (fcntl(lock_fd, F_SETLK, &lck) != 0) ! elog(NOTICE, "flock error for %s: %m", sock_path); } #endif /* HAVE_FCNTL_SETLK */ } *************** *** 828,840 **** /* sockClientConnect: connects to a server */ int ! sockClientConnect(char *hostName, short portName, bufsockptr bsock) { ACCEPT_TYPE_ARG3 len; int err; struct hostent *hp; extern int errno; - char *UnixSocketDir = NULL; /* set up the server (remote) address */ MemSet((char *) &bsock->raddr, 0, sizeof(bsock->raddr)); --- 811,822 ---- /* sockClientConnect: connects to a server */ int ! sockClientConnect(const char *hostName, short portName, bufsockptr bsock) { ACCEPT_TYPE_ARG3 len; int err; struct hostent *hp; extern int errno; /* set up the server (remote) address */ MemSet((char *) &bsock->raddr, 0, sizeof(bsock->raddr)); *************** *** 859,865 **** // DAR HACK fix to match UNIXSOCK_PATH as in pqcomm UNIXSOCK_PATH(bsock->raddr.un, portName, UnixSocketDir); len = UNIXSOCK_LEN(bsock->raddr.un); - //len = UNIXSOCK_PATH(bsock->raddr.un, portName); } /* connect to the server */ if ((bsock->sock = socket(bsock->raddr.sa.sa_family, SOCK_STREAM, 0)) < 0) --- 841,846 ---- Index: src/backend/postmaster/postmaster.c =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/backend/postmaster/postmaster.c,v retrieving revision 1.6 diff -c -r1.6 postmaster.c *** src/backend/postmaster/postmaster.c 2002/06/09 06:45:44 1.6 --- src/backend/postmaster/postmaster.c 2002/06/09 13:41:34 *************** *** 2594,2601 **** on_exit_reset(); /* we don't want the postmaster's * proc_exit() handlers */ ! ! /* Close the postmater sockets (the replication manager port (5431) of the postmaster is not yet open, so no need to close it here*/ if (NetServer) StreamClose(ServerSock_INET); --- 2594,2600 ---- on_exit_reset(); /* we don't want the postmaster's * proc_exit() handlers */ ! /* Close the postmater sockets (the replication manager port of the postmaster is not yet open, so no need to close it here*/ if (NetServer) StreamClose(ServerSock_INET); *************** *** 2608,2613 **** --- 2607,2614 ---- /* Now, on to standard postgres stuff */ MyProcPid = getpid(); + + /* TODO: set ps display */ elog(DEBUG, "starting replication manager (pid = %d)", MyProcPid); Index: src/backend/replication/pg_ensemble_simple.c =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/backend/replication/pg_ensemble_simple.c,v retrieving revision 1.1 diff -c -r1.1 pg_ensemble_simple.c *** src/backend/replication/pg_ensemble_simple.c 2002/02/08 02:06:26 1.1 --- src/backend/replication/pg_ensemble_simple.c 2002/06/09 13:41:34 *************** *** 29,37 **** #include "postgres.h" - #include - #include - #include #include #include #include --- 29,34 ---- Index: src/backend/replication/pg_ensemble_total.c =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/backend/replication/pg_ensemble_total.c,v retrieving revision 1.1 diff -c -r1.1 pg_ensemble_total.c *** src/backend/replication/pg_ensemble_total.c 2002/02/08 02:06:26 1.1 --- src/backend/replication/pg_ensemble_total.c 2002/06/09 13:41:34 *************** *** 29,37 **** #include "postgres.h" - #include - #include - #include #include #include #include --- 29,34 ---- Index: src/backend/replication/pg_group_comm.c =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/backend/replication/pg_group_comm.c,v retrieving revision 1.1 diff -c -r1.1 pg_group_comm.c *** src/backend/replication/pg_group_comm.c 2002/02/08 02:06:26 1.1 --- src/backend/replication/pg_group_comm.c 2002/06/09 13:41:34 *************** *** 29,37 **** #include "postgres.h" - #include - #include - #include #include #include #include --- 29,34 ---- Index: src/backend/replication/pg_spread.c =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/backend/replication/pg_spread.c,v retrieving revision 1.5 diff -c -r1.5 pg_spread.c *** src/backend/replication/pg_spread.c 2002/06/09 06:45:44 1.5 --- src/backend/replication/pg_spread.c 2002/06/09 13:41:34 *************** *** 42,53 **** #include "postgres.h" #include - #include #include #include - #include - #include - #include #include #include #include --- 42,49 ---- *************** *** 178,186 **** ret = SP_connect(Spread_name, User, 0, 1, &Mbox, Private_group); ! if( ret != ACCEPT_SESSION ) { ! SP_error( ret ); Bye(); } --- 174,182 ---- ret = SP_connect(Spread_name, User, 0, 1, &Mbox, Private_group); ! if(ret != ACCEPT_SESSION) { ! SP_error(ret); Bye(); } *************** *** 202,215 **** //elog(NOTICE,"read thread created"); ! /* now start listening to the replication manager sockets, to whether * whether the replication manager wants to send some messages */ for(;;) { /* listen on both the simple and the total socket */ //elog(NOTICE,"I am at the beginning of the for loop"); ! memmove((char *) &tmp_rmask, (char *) &active_rsocks, ! sizeof(fd_set)); if(select(maxSock + 1, &tmp_rmask, (fd_set *) NULL, (fd_set *) NULL, (struct timeval *) NULL) < 0) { --- 198,210 ---- //elog(NOTICE,"read thread created"); ! /* now start listening to the replication manager sockets, to * whether the replication manager wants to send some messages */ for(;;) { /* listen on both the simple and the total socket */ //elog(NOTICE,"I am at the beginning of the for loop"); ! memmove((char *) &tmp_rmask, (char *) &active_rsocks, sizeof(fd_set)); if(select(maxSock + 1, &tmp_rmask, (fd_set *) NULL, (fd_set *) NULL, (struct timeval *) NULL) < 0) { *************** *** 218,224 **** elog(ERROR, "RmgrMain: select failed (errno=%d)", errno); return STATUS_ERROR; } ! //elog(NOTICE,"got mssg from replicaManager"); if (rmgrSock_simple != INVALID_SOCK && FD_ISSET(rmgrSock_simple, &tmp_rmask)) { --- 213,219 ---- elog(ERROR, "RmgrMain: select failed (errno=%d)", errno); return STATUS_ERROR; } ! elog(NOTICE,"got message from replicaManager"); if (rmgrSock_simple != INVALID_SOCK && FD_ISSET(rmgrSock_simple, &tmp_rmask)) { *************** *** 293,299 **** static void Read_message(void) { - char sender[MAX_GROUP_NAME]; char target_groups[MAX_GROUP_MEMBERS][MAX_GROUP_NAME]; int num_groups; --- 288,293 ---- *************** *** 317,330 **** &mess_type, &endian_mismatch, sizeof(read_buf), read_buf ); - if( read_msg_length < 0 ) { SP_error( read_msg_length ); Bye(); } //elog(NOTICE,"message received, msg length = %d", read_msg_length); ! /* forward a regular message to the appropriate total or simple socket to replica manager */ --- 311,323 ---- &mess_type, &endian_mismatch, sizeof(read_buf), read_buf ); if( read_msg_length < 0 ) { SP_error( read_msg_length ); Bye(); } //elog(NOTICE,"message received, msg length = %d", read_msg_length); ! /* forward a regular message to the appropriate total or simple socket to replica manager */ *************** *** 350,356 **** Bye(); } } ! } --- 343,349 ---- Bye(); } } ! } *************** *** 381,387 **** elog(NOTICE, "hostid %u, hostname = %s", current_group_hostids[i],current_group_hostnames[i]); } ! /* figure out the hosts that disappeared */ for (i=0,k=0;istate = CLOSED; sockPutInt((int) MSG_PROTO_ERROR, 4, conn->dc.bsock); sockFlush(conn->dc.bsock); --- 264,270 ---- static void HandleProtoError(Rconn *conn) { ! elog(NOTICE, "[%d] reports protocol error!!!", MyProcPid); conn->state = CLOSED; sockPutInt((int) MSG_PROTO_ERROR, 4, conn->dc.bsock); sockFlush(conn->dc.bsock); *************** *** 450,456 **** { if (errno == EINTR) continue; ! elog(ERROR, "RmgrMain: select failed (errno=%d)", errno); return STATUS_ERROR; } --- 450,456 ---- { if (errno == EINTR) continue; ! elog(ERROR, "RmgrMain: select failed: %m"); return STATUS_ERROR; } *************** *** 542,548 **** #ifdef RMGR_DEBUG elog(NOTICE, "Rmgr: done... (%d left in buffer)", sockNumLeft(GCconn_basic.bsock)); - elog(NOTICE, "\n"); #endif } --- 542,547 ---- *************** *** 838,844 **** * FIXME: replace this with GUC stuff */ hostname[MAX_HOSTNAMELEN + 1] = '\0'; ! strcpy(startpkt.database, getenv("REPL_DATABASE")); // -(ib) strcpy(startpkt.user, "postgres\0"); strcpy(startpkt.options, "-r -o /usr/local/pgsql/localog"); --- 837,844 ---- * FIXME: replace this with GUC stuff */ hostname[MAX_HOSTNAMELEN + 1] = '\0'; ! /* FIXME: what should this be? */ ! strcpy(startpkt.database, "template1"); strcpy(startpkt.user, "postgres\0"); strcpy(startpkt.options, "-r -o /usr/local/pgsql/localog"); *************** *** 1099,1107 **** handle_gc2rtxn(conn,GCconn_total.bsock); enableSocket(conn->dc.bsock->sock); } ! ! ! return TRUE; } } --- 1099,1105 ---- handle_gc2rtxn(conn,GCconn_total.bsock); enableSocket(conn->dc.bsock->sock); } ! return TRUE; } } *************** *** 2216,2228 **** disableSocket(int sock) { int i; - #ifdef RMGR_DEBUG elog(NOTICE, "disabling socket %d", sock); #endif - FD_CLR(sock, &active_rsocks); if(sock == maxSock) { --- 2214,2224 ---- *************** *** 2242,2253 **** static void rpmgr_die(SIGNAL_ARGS) { ! elog(NOTICE,"Replication Manager dying"); exit(1); } ! #ifdef RMGR_BLAXYZ //these stat fctns are disbled for now! /* statistic gathering functions*/ static void --- 2238,2249 ---- static void rpmgr_die(SIGNAL_ARGS) { ! elog(NOTICE, "Replication Manager dying"); exit(1); } ! #ifdef RMGR_BLAXYZ //these stat fctns are disabled for now! /* statistic gathering functions*/ static void Index: src/backend/replication/rmgrLib.c =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/backend/replication/rmgrLib.c,v retrieving revision 1.1 diff -c -r1.1 rmgrLib.c *** src/backend/replication/rmgrLib.c 2002/02/08 02:06:26 1.1 --- src/backend/replication/rmgrLib.c 2002/06/09 13:41:34 *************** *** 6,12 **** * */ - #include // DAR HACK this is needed for SUN#include #include #include --- 6,11 ---- Index: src/backend/replication/writeset.c =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/backend/replication/writeset.c,v retrieving revision 1.4 diff -c -r1.4 writeset.c *** src/backend/replication/writeset.c 2002/06/09 06:45:44 1.4 --- src/backend/replication/writeset.c 2002/06/09 13:41:35 *************** *** 7,31 **** * */ - - #include - #include - #include "postgres.h" #include "access/genam.h" #include "access/heapam.h" ! ! #include "replication/replication.h" #include "catalog/catname.h" #include "catalog/pg_index.h" - #include "utils/relcache.h" - #include "utils/fmgroids.h" - // DAR HACK c.h is in postgres.h #include "catalog/pg_operator.h" #include "nodes/execnodes.h" - #include "access/printtup.h" #include "nodes/parsenodes.h" ! #include "catalog/indexing.h" #include "miscadmin.h" /* DAR HACK these globals used to be in global.c and --- 7,26 ---- * */ #include "postgres.h" + #include "access/genam.h" #include "access/heapam.h" ! #include "access/printtup.h" #include "catalog/catname.h" + #include "catalog/indexing.h" #include "catalog/pg_index.h" #include "catalog/pg_operator.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" ! #include "replication/replication.h" ! #include "utils/relcache.h" ! #include "utils/fmgroids.h" #include "miscadmin.h" /* DAR HACK these globals used to be in global.c and *************** *** 39,45 **** static void _execQuery(CmdInfo *cmd_info); static void _execUtility(CmdInfo *cmd); ! static RelInfo* _findrel(WriteSetPtr ws, char *relname); static void _freeCmdInfo(CmdInfo *cmd); static QueryInfo* _initQueryInfo(void); static void _freeQueryInfo(QueryInfo *qry); --- 34,40 ---- static void _execQuery(CmdInfo *cmd_info); static void _execUtility(CmdInfo *cmd); ! static RelInfo* _findrel(WriteSetPtr ws, const char *relname); static void _freeCmdInfo(CmdInfo *cmd); static QueryInfo* _initQueryInfo(void); static void _freeQueryInfo(QueryInfo *qry); *************** *** 245,251 **** elog(DEBUG, "Creating new RelInfo struct for relation %s", relname); #endif rel_info = (RelInfo *) malloc(sizeof(RelInfo)); ! strcpy(rel_info->name, relname); rel = RelationNameGetRelation(relname); if(!rel) elog(ERROR, "%s: Table doesn't exist", relname); --- 240,246 ---- elog(DEBUG, "Creating new RelInfo struct for relation %s", relname); #endif rel_info = (RelInfo *) malloc(sizeof(RelInfo)); ! strncpy(rel_info->name, relname, NAMEDATALEN + 1); rel = RelationNameGetRelation(relname); if(!rel) elog(ERROR, "%s: Table doesn't exist", relname); *************** *** 1153,1159 **** * RelInfo - the appropriate RelInfo struct, NULL if not found */ static RelInfo * ! _findrel(WriteSetPtr ws, char *relname) { Dlelem *curr; RelInfo *rel_info; --- 1148,1154 ---- * RelInfo - the appropriate RelInfo struct, NULL if not found */ static RelInfo * ! _findrel(WriteSetPtr ws, const char *relname) { Dlelem *curr; RelInfo *rel_info; *************** *** 1274,1280 **** if((found = HeapTupleIsValid(cTuple)) == TRUE) { classTuple = (Form_pg_class) GETSTRUCT(cTuple); ! strcpy(ind_info->indexName, classTuple->relname.data); ind_info->indexOid = RelationGetRelid( RelationNameGetRelation(ind_info->indexName)); /* DAR HACK this is not consistent with the --- 1269,1275 ---- if((found = HeapTupleIsValid(cTuple)) == TRUE) { classTuple = (Form_pg_class) GETSTRUCT(cTuple); ! strncpy(ind_info->indexName, classTuple->relname.data, NAMEDATALEN); ind_info->indexOid = RelationGetRelid( RelationNameGetRelation(ind_info->indexName)); /* DAR HACK this is not consistent with the *************** *** 1942,1950 **** { if(doTxnDelim) { - /* DAR HACK TPRINTF and TRACE_VERBOSE doesn't exist - TPRINTF(TRACE_VERBOSE, "StartTransactionCommand"); - */ elog(NOTICE,"writeset.c. StartTransactoinCommona"); StartTransactionCommand(); } --- 1937,1942 ---- *************** *** 1965,1974 **** } if(doTxnDelim) { ! /* DAR HACK TPRINTF and TRACE_VERBOSE doesn't exist ! TPRINTF(TRACE_VERBOSE, "CommitTransactionCommand"); ! */ ! elog(NOTICE,"writeset.c committransactioncommand"); CommitTransactionCommand(); } DLRemove(curr); --- 1957,1963 ---- } if(doTxnDelim) { ! elog(NOTICE,"writeset.c CommitTransactionCommand"); CommitTransactionCommand(); } DLRemove(curr); Index: src/include/libpq/bufferedSock.h =================================================================== RCS file: /home/postgres/cvs_root/pgsql-r-v7.2/src/include/libpq/bufferedSock.h,v retrieving revision 1.1 diff -c -r1.1 bufferedSock.h *** src/include/libpq/bufferedSock.h 2002/02/08 02:20:15 1.1 --- src/include/libpq/bufferedSock.h 2002/06/09 13:41:35 *************** *** 121,129 **** extern void sockPutIntE(int value, int bytes, ebufptr extbuf); extern void sockFlushE(ebufptr extbuf, bufsockptr bsock, bool flushBeforePipe); ! extern int sockServerPort(char *hostName, short portName, int *fdP, struct sockaddr *serverAddr); extern int sockServerAccept(int server_fd, bufsockptr bsock); ! extern int sockClientConnect(char *hostName, short portName, bufsockptr bsock); extern void sockClose(bufsockptr bsock); extern void sockUnlink(char path[]); --- 121,129 ---- extern void sockPutIntE(int value, int bytes, ebufptr extbuf); extern void sockFlushE(ebufptr extbuf, bufsockptr bsock, bool flushBeforePipe); ! extern int sockServerPort(const char *hostName, short portName, int *fdP, struct sockaddr *serverAddr); extern int sockServerAccept(int server_fd, bufsockptr bsock); ! extern int sockClientConnect(const char *hostName, short portName, bufsockptr bsock); extern void sockClose(bufsockptr bsock); extern void sockUnlink(char path[]);