1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
|
# See the file LICENSE for redistribution information.
#
# Copyright (c) 1996-2009 Oracle. All rights reserved.
#
# The procs in this file are used for replication messaging
# ONLY when the default mechanism of setting up a queue of
# messages in a environment is not possible. This situation
# is fairly rare, but it is necessary when a replication
# test simultaneously runs different versions of Berkeley DB,
# because different versions cannot share an env.
#
# Note, all procs should be named with the suffix _noenv
# so it's explicit that we are using them.
#
# Close up a replication group - close all message dbs.
proc replclose_noenv { queuedir } {
global queuedbs machids
set dbs [array names queuedbs]
foreach tofrom $dbs {
set handle $queuedbs($tofrom)
error_check_good db_close [$handle close] 0
unset queuedbs($tofrom)
}
set machids {}
}
# Create a replication group for testing.
proc replsetup_noenv { queuedir } {
global queuedbs machids
file mkdir $queuedir
# If there are any leftover handles, get rid of them.
set dbs [array names queuedbs]
foreach tofrom $dbs {
unset queuedbs($tofrom)
}
set machids {}
}
# Send function for replication.
proc replsend_noenv { control rec fromid toid flags lsn } {
global is_repchild
global queuedbs machids
global drop drop_msg
global perm_sent_list
global anywhere
global qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
set permflags [lsearch $flags "perm"]
if { [llength $perm_sent_list] != 0 && $permflags != -1 } {
# puts "replsend_noenv sent perm message, LSN $lsn"
lappend perm_sent_list $lsn
}
#
# If we are testing with dropped messages, then we drop every
# $drop_msg time. If we do that just return 0 and don't do
# anything.
#
if { $drop != 0 } {
incr drop
if { $drop == $drop_msg } {
set drop 1
return 0
}
}
# XXX
# -1 is DB_BROADCAST_EID
if { $toid == -1 } {
set machlist $machids
} else {
set m NULL
# If we can send this anywhere, send it to the first id
# we find that is neither toid or fromid. If we don't
# find any other candidates, this falls back to the
# original toid.
if { $anywhere != 0 } {
set anyflags [lsearch $flags "any"]
if { $anyflags != -1 } {
foreach m $machids {
if { $m == $fromid || $m == $toid } {
continue
}
set machlist [list $m]
break
}
}
}
#
# If we didn't find a different site, fall back
# to the toid.
#
if { $m == "NULL" } {
set machlist [list $toid]
}
}
foreach m $machlist {
# Do not broadcast to self.
if { $m == $fromid } {
continue
}
# Find the handle for the right message file.
set pid [pid]
set db $queuedbs($m.$fromid.$pid)
set stat [catch {$db put -append [list $control $rec $fromid]} ret]
}
if { $is_repchild } {
replready_noenv $fromid from
}
return 0
}
#
proc replmsglen_noenv { machid {tf "to"}} {
global queuedbs qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
set orig [pwd]
cd $queuedir
if { $tf == "to" } {
set msgdbs [glob -nocomplain ready.$machid.*]
} else {
set msgdbs [glob -nocomplain ready.*.$machid.*]
}
cd $orig
return [llength $msgdbs]
}
# Discard all the pending messages for a particular site.
proc replclear_noenv { machid {tf "to"}} {
global queuedbs qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
set orig [pwd]
cd $queuedir
if { $tf == "to" } {
set msgdbs [glob -nocomplain ready.$machid.*]
} else {
set msgdbs [glob -nocomplain ready.*.$machid.*]
}
foreach m $msgdbs {
file delete -force $m
}
cd $orig
set dbs [array names queuedbs]
foreach tofrom $dbs {
# Process only messages _to_ the specified machid.
if { [string match $machid.* $tofrom] == 1 } {
set db $queuedbs($tofrom)
set dbc [$db cursor]
for { set dbt [$dbc get -first] } \
{ [llength $dbt] > 0 } \
{ set dbt [$dbc get -next] } {
error_check_good \
replclear($machid)_del [$dbc del] 0
}
error_check_good replclear($db)_dbc_close [$dbc close] 0
}
}
cd $queuedir
if { $tf == "to" } {
set msgdbs [glob -nocomplain temp.$machid.*]
} else {
set msgdbs [glob -nocomplain temp.*.$machid.*]
}
foreach m $msgdbs {
# file delete -force $m
}
cd $orig
}
# Makes messages available to replprocessqueue by closing and
# renaming the message files. We ready the files for one machine
# ID at a time -- just those "to" or "from" the machine we want to
# process, depending on 'tf'.
proc replready_noenv { machid tf } {
global queuedbs machids
global counter
global qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
set pid [pid]
#
# Close the temporary message files for the specified machine.
# Only close it if there are messages available.
#
set dbs [array names queuedbs]
set closed {}
foreach tofrom $dbs {
set toidx [string first . $tofrom]
set toid [string replace $tofrom $toidx end]
set fidx [expr $toidx + 1]
set fromidx [string first . $tofrom $fidx]
#
# First chop off the end, then chop off the toid
# in the beginning.
#
set fromid [string replace $tofrom $fromidx end]
set fromid [string replace $fromid 0 $toidx]
if { ($tf == "to" && $machid == $toid) || \
($tf == "from" && $machid == $fromid) } {
set nkeys [stat_field $queuedbs($tofrom) \
stat "Number of keys"]
if { $nkeys != 0 } {
lappend closed \
[list $toid $fromid temp.$tofrom]
error_check_good temp_close \
[$queuedbs($tofrom) close] 0
}
}
}
# Rename the message files.
set cwd [pwd]
foreach filename $closed {
set toid [lindex $filename 0]
set fromid [lindex $filename 1]
set fname [lindex $filename 2]
set tofrom [string replace $fname 0 4]
incr counter($machid)
cd $queuedir
# puts "$queuedir: Msg ready $fname to ready.$tofrom.$counter($machid)"
file rename -force $fname ready.$tofrom.$counter($machid)
cd $cwd
replsetuptempfile_noenv $toid $fromid $queuedir
}
}
# Add a machine to a replication environment. This checks
# that we have not already established that machine id, and
# adds the machid to the list of ids.
proc repladd_noenv { machid } {
global queuedbs machids counter qtestdir testdir
if { ![info exists qtestdir] } {
set qtestdir $testdir
}
set queuedir $qtestdir/MSGQUEUEDIR
if { [info exists machids] } {
if { [lsearch -exact $machids $machid] >= 0 } {
error "FAIL: repladd_noenv: machid $machid already exists."
}
}
set counter($machid) 0
lappend machids $machid
# Create all the databases that receive messages sent _to_
# the new machid.
replcreatetofiles_noenv $machid $queuedir
# Create all the databases that receive messages sent _from_
# the new machid.
replcreatefromfiles_noenv $machid $queuedir
}
# Creates all the databases that a machid needs for receiving messages
# from other participants in a replication group. Used when first
# establishing the temp files, but also used whenever replready_noenv moves
# the temp files away, because we'll need new files for any future messages.
proc replcreatetofiles_noenv { toid queuedir } {
global machids
foreach m $machids {
# We don't need a file for a machid to send itself messages.
if { $m == $toid } {
continue
}
replsetuptempfile_noenv $toid $m $queuedir
}
}
# Creates all the databases that a machid needs for sending messages
# to other participants in a replication group. Used when first
# establishing the temp files only. Replready moves files based on
# recipient, so we recreate files based on the recipient, also.
proc replcreatefromfiles_noenv { fromid queuedir } {
global machids
foreach m $machids {
# We don't need a file for a machid to send itself messages.
if { $m == $fromid } {
continue
}
replsetuptempfile_noenv $m $fromid $queuedir
}
}
proc replsetuptempfile_noenv { to from queuedir } {
global queuedbs
set pid [pid]
# puts "Open new temp.$to.$from.$pid"
set queuedbs($to.$from.$pid) [berkdb_open -create -excl -recno\
-renumber $queuedir/temp.$to.$from.$pid]
error_check_good open_queuedbs [is_valid_db $queuedbs($to.$from.$pid)] TRUE
}
# Process a queue of messages, skipping every "skip_interval" entry.
# We traverse the entire queue, but since we skip some messages, we
# may end up leaving things in the queue, which should get picked up
# on a later run.
proc replprocessqueue_noenv { dbenv machid { skip_interval 0 } { hold_electp NONE } \
{ dupmasterp NONE } { errp NONE } } {
global errorCode
global perm_response_list
global qtestdir testdir
# hold_electp is a call-by-reference variable which lets our caller
# know we need to hold an election.
if { [string compare $hold_electp NONE] != 0 } {
upvar $hold_electp hold_elect
}
set hold_elect 0
# dupmasterp is a call-by-reference variable which lets our caller
# know we have a duplicate master.
if { [string compare $dupmasterp NONE] != 0 } {
upvar $dupmasterp dupmaster
}
set dupmaster 0
# errp is a call-by-reference variable which lets our caller
# know we have gotten an error (that they expect).
if { [string compare $errp NONE] != 0 } {
upvar $errp errorp
}
set errorp 0
set nproced 0
set queuedir $qtestdir/MSGQUEUEDIR
# puts "replprocessqueue_noenv: Make ready messages to eid $machid"
# Change directories temporarily so we get just the msg file name.
set cwd [pwd]
cd $queuedir
set msgdbs [glob -nocomplain ready.$machid.*]
# puts "$queuedir.$machid: My messages: $msgdbs"
cd $cwd
foreach msgdb $msgdbs {
set db [berkdb_open $queuedir/$msgdb]
set dbc [$db cursor]
error_check_good process_dbc($machid) \
[is_valid_cursor $dbc $db] TRUE
for { set dbt [$dbc get -first] } \
{ [llength $dbt] != 0 } \
{ set dbt [$dbc get -next] } {
set data [lindex [lindex $dbt 0] 1]
set recno [lindex [lindex $dbt 0] 0]
# If skip_interval is nonzero, we want to process
# messages out of order. We do this in a simple but
# slimy way -- continue walking with the cursor
# without processing the message or deleting it from
# the queue, but do increment "nproced". The way
# this proc is normally used, the precise value of
# nproced doesn't matter--we just don't assume the
# queues are empty if it's nonzero. Thus, if we
# contrive to make sure it's nonzero, we'll always
# come back to records we've skipped on a later call
# to replprocessqueue. (If there really are no records,
# we'll never get here.)
#
# Skip every skip_interval'th record (and use a
# remainder other than zero so that we're guaranteed
# to really process at least one record on every call).
if { $skip_interval != 0 } {
if { $nproced % $skip_interval == 1 } {
incr nproced
set dbt [$dbc get -next]
continue
}
}
# We need to remove the current message from the
# queue, because we're about to end the transaction
# and someone else processing messages might come in
# and reprocess this message which would be bad.
#
error_check_good queue_remove [$dbc del] 0
# We have to play an ugly cursor game here: we
# currently hold a lock on the page of messages, but
# rep_process_message might need to lock the page with
# a different cursor in order to send a response. So
# save the next recno, close the cursor, and then
# reopen and reset the cursor. If someone else is
# processing this queue, our entry might have gone
# away, and we need to be able to handle that.
#
# error_check_good dbc_process_close [$dbc close] 0
set ret [catch {$dbenv rep_process_message \
[lindex $data 2] [lindex $data 0] \
[lindex $data 1]} res]
# Save all ISPERM and NOTPERM responses so we can
# compare their LSNs to the LSN in the log. The
# variable perm_response_list holds the entire
# response so we can extract responses and LSNs as
# needed.
#
if { [llength $perm_response_list] != 0 && \
([is_substr $res ISPERM] || [is_substr $res NOTPERM]) } {
lappend perm_response_list $res
}
if { $ret != 0 } {
if { [string compare $errp NONE] != 0 } {
set errorp "$dbenv $machid $res"
} else {
error "FAIL:[timestamp]\
rep_process_message returned $res"
}
}
incr nproced
if { $ret == 0 } {
set rettype [lindex $res 0]
set retval [lindex $res 1]
#
# Do nothing for 0 and NEWSITE
#
if { [is_substr $rettype HOLDELECTION] } {
set hold_elect 1
}
if { [is_substr $rettype DUPMASTER] } {
set dupmaster "1 $dbenv $machid"
}
if { [is_substr $rettype NOTPERM] || \
[is_substr $rettype ISPERM] } {
set lsnfile [lindex $retval 0]
set lsnoff [lindex $retval 1]
}
}
if { $errorp != 0 } {
# Break on an error, caller wants to handle it.
break
}
if { $hold_elect == 1 } {
# Break on a HOLDELECTION, for the same reason.
break
}
if { $dupmaster == 1 } {
# Break on a DUPMASTER, for the same reason.
break
}
}
error_check_good dbc_close [$dbc close] 0
#
# Check the number of keys remaining because we only
# want to rename to done, message file that are
# fully processed. Some message types might break
# out of the loop early and we want to process
# the remaining messages the next time through.
#
set nkeys [stat_field $db stat "Number of keys"]
error_check_good db_close [$db close] 0
if { $nkeys == 0 } {
set dbname [string replace $msgdb 0 5 done.]
#
# We have to do a special dance to get rid of the
# empty messaging files because of the way Windows
# handles open files marked for deletion.
# On Windows, a file is marked for deletion but
# does not actually get deleted until the last handle
# is closed. This causes a problem when a test tries
# to create a new file with a previously-used name,
# and Windows believes the old file still exists.
# Therefore, we rename the files before deleting them,
# to guarantee they are out of the way.
#
file rename -force $queuedir/$msgdb $queuedir/$dbname
file delete -force $queuedir/$dbname
}
}
# Return the number of messages processed.
return $nproced
}
|