1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
<!--$Id: ex_rq.so,v 1.4 2002/06/24 14:50:48 bostic Exp $-->
<!--Copyright 1997-2004 by Sleepycat Software, Inc.-->
<!--All rights reserved.-->
<!--See the file LICENSE for redistribution information.-->
<html>
<head>
<title>Berkeley DB Reference Guide: Ex_repquote: putting it all together</title>
<meta name="description" content="Berkeley DB: An embedded database programmatic toolkit.">
<meta name="keywords" content="embedded,database,programmatic,toolkit,btree,hash,hashing,transaction,transactions,locking,logging,access method,access methods,Java,C,C++">
</head>
<body bgcolor=white>
<table width="100%"><tr valign=top>
<td><h3><dl><dt>Berkeley DB Reference Guide:<dd>Berkeley DB Replication</dl></h3></td>
<td align=right><a href="../rep/ex_comm.html"><img src="../../images/prev.gif" alt="Prev"></a><a href="../toc.html"><img src="../../images/ref.gif" alt="Ref"></a><a href="../xa/intro.html"><img src="../../images/next.gif" alt="Next"></a>
</td></tr></table>
<p>
<h3 align=center>Ex_repquote: putting it all together</h3>
<p>A replicated application must initialize a replicated environment, set
up its communication infrastructure, and then make sure that incoming
messages are received and processed.</p>
<p>To initialize replication, ex_repquote creates a Berkeley DB environment and
calls <a href="../../api_c/rep_transport.html">DB_ENV->set_rep_transport</a> to establish a send function. The following
code fragment (from the env_init function, found in
<b>ex_repquote/ex_rq_main.c</b>) demonstrates this. Prior to calling
this function, the application has called machtab_init to initialize
its environment ID to port mapping structure and passed this structure
into env_init.</p>
<pre><blockquote>if ((ret = db_env_create(&dbenv, 0)) != 0) {
fprintf(stderr, "%s: env create failed: %s\n",
progname, db_strerror(ret));
return (ret);
}
dbenv->set_errfile(dbenv, stderr);
dbenv->set_errpfx(dbenv, prefix);
(void)dbenv->set_cachesize(dbenv, 0, CACHESIZE, 0);
<p>
dbenv->app_private = machtab;
(void)dbenv->set_rep_transport(dbenv, SELF_EID, quote_send);
<p>
flags = DB_CREATE | DB_THREAD |
DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN;
<p>
ret = dbenv->open(dbenv, home, flags, 0);</blockquote></pre>
<p>ex_repquote opens a listening socket for incoming connections and opens
an outgoing connection to every machine that it knows about (that is,
all the sites listed in the <b>-o</b> command line argument).
Applications can structure the details of this in different ways, but
ex_repquote creates a user-level thread to listen on its socket, plus
a thread to loop and handle messages on each socket, in addition to the
threads needed to manage the user interface, update the database on the
master, and read from the database on the client (in other words, in
addition to the normal functionality of any database application).</p>
<p>Once the initial threads have all been started and the communications
infrastructure is initialized, the application signals that it is ready
for replication and joins a replication group by calling
<a href="../../api_c/rep_start.html">DB_ENV->rep_start</a>:</p>
<pre><blockquote>if (whoami == MASTER) {
if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) {
/* Complain and exit on error. */
}
/* Go run the master application code. */
} else {
memset(&local, 0, sizeof(local));
local.data = myaddr;
local.size = strlen(myaddr) + 1;
if ((ret =
dbenv->rep_start(dbenv, &local, DB_REP_CLIENT)) != 0) {
/* Complain and exit on error. */
}
/* Sleep to give ourselves a minute to find a master. */
sleep(5);
/* Go run the client application code. */
}</blockquote></pre>
<p>Note the use of the optional second argument to <a href="../../api_c/rep_start.html">DB_ENV->rep_start</a> in
the client initialization code. The argument "myaddr" is a piece of
data, opaque to Berkeley DB, that will be broadcast to each member of a
replication group; it allows new clients to join a replication group,
without knowing the location of all its members; the new client will
be contacted by the members it does not know about, who will receive
the new client's contact information that was specified in "myaddr."
See <a href="../../ref/rep/newsite.html">Connecting to a new site</a> for more
information.</p>
<p>The final piece of a replicated application is the code that loops,
receives, and processes messages from a given remote environment.
ex_repquote runs one of these loops in a parallel thread for each socket
connection; other applications may want to queue messages somehow and
process them asynchronously, or select() on a number of sockets and
either look up the correct environment ID for each or encapsulate the
ID in the communications protocol. The details may thus vary from
application to application, but in ex_repquote the message-handling loop
is as follows (code fragment from the hm_loop function, found in
<b>ex_repquote/ex_rq_util.c</b>):</p>
<pre><blockquote>DB_ENV *dbenv;
DBT rec, control; /* Structures encapsulating a received message. */
elect_args *ea; /* Parameters to the elect thread. */
machtab_t *tab; /* The environment ID to fd mapping table. */
pthread_t elect_thr; /* Election thread spawned. */
repsite_t self; /* My host and port identification. */
int eid; /* Environment from whom I am receiving messages. */
int fd; /* FD on which I am receiving messages. */
int master_eid; /* Global indicating the current master eid. */
int n; /* Number of sites; obtained from machtab_parm. */
int newm; /* New master EID. */
int open; /* Boolean indicating if connection already exists. */
int pri; /* My priority. */
int r, ret; /* Return values. */
int timeout; /* My election timeout value. */
int tmpid; /* Used to call dbenv->rep_process_message. */
char *c; /* Temp used in parsing host:port names. */
char *myaddr; /* My host/port address. */
char *progname; /* Program name for error messages. */
void *status; /* Pthread return status. */
for (ret = 0; ret == 0;) {
if ((ret = get_next_message(fd, &rec, &control)) != 0) {
/*
* There was some sort of network error; close this
* connection and remove it from the table of
* environment IDs.
*/
close(fd);
if ((ret = machtab_rem(tab, eid, 1)) != 0)
break;
<p>
/*
* If I'm the master, I just lost a client and this
* thread is done.
*/
if (master_eid == SELF_EID)
break;
<p>
/*
* If I was talking with the master and the master
* went away, I need to call an election; else I'm
* done.
*/
if (master_eid != eid)
break;
<p>
master_eid = DB_EID_INVALID;
/*
* In ex_repquote, the environment ID table stores
* election parameters.
*/
machtab_parm(tab, &n, &pri, &timeout);
if ((ret = dbenv->rep_elect(dbenv,
n, 0, pri, timeout, &newm, 0)) != 0)
continue;
<p>
/*
* If I won the election, become the master.
* Otherwise, just exit.
*/
if (newm == SELF_EID && (ret =
dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) == 0)
ret = domaster(dbenv, progname);
break;
}
<p>
/* If we get here, we have a message to process. */
<p>
tmpid = eid;
switch(r = dbenv->rep_process_message(dbenv,
&control, &rec, &tmpid, NULL)) {
case DB_REP_NEWSITE:
/*
* Check if we got sent connect information and if we
* did, if this is me or if we already have a
* connection to this new site. If we don't,
* establish a new one.
*/
<p>
/* No connect info. */
if (rec.size == 0)
break;
<p>
/* It's me, do nothing. */
if (strncmp(myaddr, rec.data, rec.size) == 0)
break;
<p>
self.host = (char *)rec.data;
self.host = strtok(self.host, ":");
if ((c = strtok(NULL, ":")) == NULL) {
dbenv->errx(dbenv, "Bad host specification");
goto err;
}
self.port = atoi(c);
<p>
/*
* We try to connect to the new site. If we can't,
* we treat it as an error since we know that the site
* should be up if we got a message from it (even
* indirectly).
*/
if ((ret = connect_site(dbenv,
tab, progname, &self, &open, &eid)) != 0)
goto err;
break;
case DB_REP_HOLDELECTION:
if (master_eid == SELF_EID)
break;
/* Make sure that previous election has finished. */
if (ea != NULL) {
(void)pthread_join(elect_thr, &status);
ea = NULL;
}
if ((ea = calloc(sizeof(elect_args), 1)) == NULL) {
ret = errno;
goto err;
}
ea->dbenv = dbenv;
ea->machtab = tab;
ret = pthread_create(&elect_thr,
NULL, elect_thread, (void *)ea);
break;
case DB_REP_NEWMASTER:
/* Check if it's us. */
master_eid = tmpid;
if (tmpid == SELF_EID) {
if ((ret = dbenv->rep_start(dbenv,
NULL, DB_REP_MASTER)) != 0)
goto err;
ret = domaster(dbenv, progname);
}
break;
case 0:
break;
default:
dbenv->err(dbenv, r, "DBENV->rep_process_message");
break;
}
}</blockquote></pre>
<table width="100%"><tr><td><br></td><td align=right><a href="../rep/ex_comm.html"><img src="../../images/prev.gif" alt="Prev"></a><a href="../toc.html"><img src="../../images/ref.gif" alt="Ref"></a><a href="../xa/intro.html"><img src="../../images/next.gif" alt="Next"></a>
</td></tr></table>
<p><font size=1><a href="../../sleepycat/legal.html">Copyright (c) 1996-2004</a> <a href="http://www.sleepycat.com">Sleepycat Software, Inc.</a> - All rights reserved.</font>
</body>
</html>
|