Remove definition of builtin function
[platform/upstream/db4.git] / btree / bt_put.c
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996-2009 Oracle.  All rights reserved.
5  */
6 /*
7  * Copyright (c) 1990, 1993, 1994, 1995, 1996
8  *      Keith Bostic.  All rights reserved.
9  */
10 /*
11  * Copyright (c) 1990, 1993, 1994, 1995
12  *      The Regents of the University of California.  All rights reserved.
13  *
14  * This code is derived from software contributed to Berkeley by
15  * Mike Olson.
16  *
17  * Redistribution and use in source and binary forms, with or without
18  * modification, are permitted provided that the following conditions
19  * are met:
20  * 1. Redistributions of source code must retain the above copyright
21  *    notice, this list of conditions and the following disclaimer.
22  * 2. Redistributions in binary form must reproduce the above copyright
23  *    notice, this list of conditions and the following disclaimer in the
24  *    documentation and/or other materials provided with the distribution.
25  * 3. Neither the name of the University nor the names of its contributors
26  *    may be used to endorse or promote products derived from this software
27  *    without specific prior written permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39  * SUCH DAMAGE.
40  *
41  * $Id$
42  */
43
44 #include "db_config.h"
45
46 #include "db_int.h"
47 #include "dbinc/db_page.h"
48 #include "dbinc/btree.h"
49 #include "dbinc/lock.h"
50 #include "dbinc/mp.h"
51
52 static int __bam_build
53                __P((DBC *, u_int32_t, DBT *, PAGE *, u_int32_t, u_int32_t));
54 static int __bam_dup_check __P((DBC *, u_int32_t,
55                 PAGE *, u_int32_t, u_int32_t, db_indx_t *));
56 static int __bam_dup_convert __P((DBC *, PAGE *, u_int32_t, u_int32_t));
57 static int __bam_ovput
58                __P((DBC *, u_int32_t, db_pgno_t, PAGE *, u_int32_t, DBT *));
59 static u_int32_t
60            __bam_partsize __P((DB *, u_int32_t, DBT *, PAGE *, u_int32_t));
61
62 /*
63  * __bam_iitem --
64  *      Insert an item into the tree.
65  *
66  * PUBLIC: int __bam_iitem __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t));
67  */
68 int
69 __bam_iitem(dbc, key, data, op, flags)
70         DBC *dbc;
71         DBT *key, *data;
72         u_int32_t op, flags;
73 {
74         BKEYDATA *bk, bk_tmp;
75         BTREE *t;
76         BTREE_CURSOR *cp;
77         DB *dbp;
78         DBT bk_hdr, tdbt;
79         DB_MPOOLFILE *mpf;
80         ENV *env;
81         PAGE *h;
82         db_indx_t cnt, indx;
83         u_int32_t data_size, have_bytes, need_bytes, needed, pages, pagespace;
84         char tmp_ch;
85         int cmp, bigkey, bigdata, del, dupadjust;
86         int padrec, replace, ret, t_ret, was_deleted;
87
88         COMPQUIET(cnt, 0);
89
90         dbp = dbc->dbp;
91         env = dbp->env;
92         mpf = dbp->mpf;
93         cp = (BTREE_CURSOR *)dbc->internal;
94         t = dbp->bt_internal;
95         h = cp->page;
96         indx = cp->indx;
97         del = dupadjust = replace = was_deleted = 0;
98
99         /*
100          * Fixed-length records with partial puts: it's an error to specify
101          * anything other simple overwrite.
102          */
103         if (F_ISSET(dbp, DB_AM_FIXEDLEN) &&
104             F_ISSET(data, DB_DBT_PARTIAL) && data->size != data->dlen)
105                 return (__db_rec_repl(env, data->size, data->dlen));
106
107         /*
108          * Figure out how much space the data will take, including if it's a
109          * partial record.
110          *
111          * Fixed-length records: it's an error to specify a record that's
112          * longer than the fixed-length, and we never require less than
113          * the fixed-length record size.
114          */
115         data_size = F_ISSET(data, DB_DBT_PARTIAL) ?
116             __bam_partsize(dbp, op, data, h, indx) : data->size;
117         padrec = 0;
118         if (F_ISSET(dbp, DB_AM_FIXEDLEN)) {
119                 if (data_size > t->re_len)
120                         return (__db_rec_toobig(env, data_size, t->re_len));
121
122                 /* Records that are deleted anyway needn't be padded out. */
123                 if (!LF_ISSET(BI_DELETED) && data_size < t->re_len) {
124                         padrec = 1;
125                         data_size = t->re_len;
126                 }
127         }
128
129         /*
130          * Handle partial puts or short fixed-length records: check whether we
131          * can just append the data or else build the real record.  We can't
132          * append if there are secondaries: we need the whole data item for the
133          * application's secondary callback.
134          */
135         if (op == DB_CURRENT && dbp->dup_compare == NULL &&
136             F_ISSET(data, DB_DBT_PARTIAL) && !DB_IS_PRIMARY(dbp)) {
137                 bk = GET_BKEYDATA(
138                     dbp, h, indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
139                 /*
140                  * If the item is an overflow type, and the input DBT is
141                  * partial, and begins at the length of the current item then
142                  * it is an append. Avoid deleting and re-creating the entire
143                  * offpage item.
144                  */
145                 if (B_TYPE(bk->type) == B_OVERFLOW &&
146                     data->doff == ((BOVERFLOW *)bk)->tlen) {
147                         /*
148                          * If the cursor has not already cached the last page
149                          * in the offpage chain. We need to walk the chain
150                          * to be sure that the page has been read.
151                          */
152                         if (cp->stream_start_pgno != ((BOVERFLOW *)bk)->pgno ||
153                             cp->stream_off > data->doff || data->doff >
154                             cp->stream_off + P_MAXSPACE(dbp, dbp->pgsize)) {
155                                 memset(&tdbt, 0, sizeof(DBT));
156                                 tdbt.doff = data->doff - 1;
157                                 /*
158                                  * Set the length to 1, to force __db_goff
159                                  * to do the traversal.
160                                  */
161                                 tdbt.dlen = tdbt.ulen = 1;
162                                 tdbt.data = &tmp_ch;
163                                 tdbt.flags = DB_DBT_PARTIAL | DB_DBT_USERMEM;
164
165                                 /*
166                                  * Read to the last page.  It will be cached
167                                  * in the cursor.
168                                  */
169                                 if ((ret = __db_goff(
170                                     dbc, &tdbt, ((BOVERFLOW *)bk)->tlen,
171                                     ((BOVERFLOW *)bk)->pgno, NULL, NULL)) != 0)
172                                         return (ret);
173                         }
174
175                         /*
176                          * Since this is an append, dlen is irrelevant (there
177                          * are no bytes to overwrite). We need the caller's
178                          * DBT size to end up with the total size of the item.
179                          * From now on, use dlen as the length of the user's
180                          * data that we are going to append.
181                          * Don't futz with the caller's DBT any more than we
182                          * have to in order to send back the size.
183                          */
184                         tdbt = *data;
185                         tdbt.dlen = data->size;
186                         tdbt.size = data_size;
187                         data = &tdbt;
188                         F_SET(data, DB_DBT_STREAMING);
189                 }
190         }
191         if (!F_ISSET(data, DB_DBT_STREAMING) &&
192             (padrec || F_ISSET(data, DB_DBT_PARTIAL))) {
193                 tdbt = *data;
194                 if ((ret =
195                     __bam_build(dbc, op, &tdbt, h, indx, data_size)) != 0)
196                         return (ret);
197                 data = &tdbt;
198         }
199
200         /*
201          * If the user has specified a duplicate comparison function, return
202          * an error if DB_CURRENT was specified and the replacement data
203          * doesn't compare equal to the current data.  This stops apps from
204          * screwing up the duplicate sort order.  We have to do this after
205          * we build the real record so that we're comparing the real items.
206          */
207         if (op == DB_CURRENT && dbp->dup_compare != NULL) {
208                 if ((ret = __bam_cmp(dbc, data, h,
209                     indx + (TYPE(h) == P_LBTREE ? O_INDX : 0),
210                     dbp->dup_compare, &cmp)) != 0)
211                         return (ret);
212                 if (cmp != 0) {
213                         __db_errx(env,
214                 "Existing data sorts differently from put data");
215                         return (EINVAL);
216                 }
217         }
218
219         /*
220          * If the key or data item won't fit on a page, we'll have to store
221          * them on overflow pages.
222          */
223         needed = 0;
224         bigdata = data_size > cp->ovflsize;
225         switch (op) {
226         case DB_KEYFIRST:
227                 /* We're adding a new key and data pair. */
228                 bigkey = key->size > cp->ovflsize;
229                 if (bigkey)
230                         needed += BOVERFLOW_PSIZE;
231                 else
232                         needed += BKEYDATA_PSIZE(key->size);
233                 if (bigdata)
234                         needed += BOVERFLOW_PSIZE;
235                 else
236                         needed += BKEYDATA_PSIZE(data_size);
237                 break;
238         case DB_AFTER:
239         case DB_BEFORE:
240         case DB_CURRENT:
241                 /*
242                  * We're either overwriting the data item of a key/data pair
243                  * or we're creating a new on-page duplicate and only adding
244                  * a data item.
245                  *
246                  * !!!
247                  * We're not currently correcting for space reclaimed from
248                  * already deleted items, but I don't think it's worth the
249                  * complexity.
250                  */
251                 bigkey = 0;
252                 if (op == DB_CURRENT) {
253                         bk = GET_BKEYDATA(dbp, h,
254                             indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
255                         if (B_TYPE(bk->type) == B_KEYDATA)
256                                 have_bytes = BKEYDATA_PSIZE(bk->len);
257                         else
258                                 have_bytes = BOVERFLOW_PSIZE;
259                         need_bytes = 0;
260                 } else {
261                         have_bytes = 0;
262                         need_bytes = sizeof(db_indx_t);
263                 }
264                 if (bigdata)
265                         need_bytes += BOVERFLOW_PSIZE;
266                 else
267                         need_bytes += BKEYDATA_PSIZE(data_size);
268
269                 if (have_bytes < need_bytes)
270                         needed += need_bytes - have_bytes;
271                 break;
272         default:
273                 return (__db_unknown_flag(env, "DB->put", op));
274         }
275
276         /* Split the page if there's not enough room. */
277         if (P_FREESPACE(dbp, h) < needed)
278                 return (DB_NEEDSPLIT);
279
280         /*
281          * Check to see if we will convert to off page duplicates -- if
282          * so, we'll need a page.
283          */
284         if (F_ISSET(dbp, DB_AM_DUP) &&
285             TYPE(h) == P_LBTREE && op != DB_KEYFIRST &&
286             P_FREESPACE(dbp, h) - needed <= dbp->pgsize / 2 &&
287             __bam_dup_check(dbc, op, h, indx, needed, &cnt)) {
288                 pages = 1;
289                 dupadjust = 1;
290         } else
291                 pages = 0;
292
293         /*
294          * If we are not using transactions and there is a page limit
295          * set on the file, then figure out if things will fit before
296          * taking action.
297          */
298         if (dbc->txn == NULL && mpf->mfp->maxpgno != 0) {
299                 pagespace = P_MAXSPACE(dbp, dbp->pgsize);
300                 if (bigdata)
301                         pages += ((data_size - 1) / pagespace) + 1;
302                 if (bigkey)
303                         pages += ((key->size - 1) / pagespace) + 1;
304
305                 if (pages > (mpf->mfp->maxpgno - mpf->mfp->last_pgno))
306                         return (__db_space_err(dbp));
307         }
308
309         ret = __memp_dirty(mpf, &h,
310              dbc->thread_info, dbc->txn, dbc->priority, 0);
311         if (cp->csp->page == cp->page)
312                 cp->csp->page = h;
313         cp->page = h;
314         if (ret != 0)
315                 return (ret);
316
317         /*
318          * The code breaks it up into five cases:
319          *
320          * 1. Insert a new key/data pair.
321          * 2. Append a new data item (a new duplicate).
322          * 3. Insert a new data item (a new duplicate).
323          * 4. Delete and re-add the data item (overflow item).
324          * 5. Overwrite the data item.
325          */
326         switch (op) {
327         case DB_KEYFIRST:               /* 1. Insert a new key/data pair. */
328                 if (bigkey) {
329                         if ((ret = __bam_ovput(dbc,
330                             B_OVERFLOW, PGNO_INVALID, h, indx, key)) != 0)
331                                 return (ret);
332                 } else
333                         if ((ret = __db_pitem(dbc, h, indx,
334                             BKEYDATA_SIZE(key->size), NULL, key)) != 0)
335                                 return (ret);
336
337                 if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0)
338                         return (ret);
339                 ++indx;
340                 break;
341         case DB_AFTER:                  /* 2. Append a new data item. */
342                 if (TYPE(h) == P_LBTREE) {
343                         /* Copy the key for the duplicate and adjust cursors. */
344                         if ((ret =
345                             __bam_adjindx(dbc, h, indx + P_INDX, indx, 1)) != 0)
346                                 return (ret);
347                         if ((ret =
348                             __bam_ca_di(dbc, PGNO(h), indx + P_INDX, 1)) != 0)
349                                 return (ret);
350
351                         indx += 3;
352
353                         cp->indx += 2;
354                 } else {
355                         ++indx;
356                         cp->indx += 1;
357                 }
358                 break;
359         case DB_BEFORE:                 /* 3. Insert a new data item. */
360                 if (TYPE(h) == P_LBTREE) {
361                         /* Copy the key for the duplicate and adjust cursors. */
362                         if ((ret = __bam_adjindx(dbc, h, indx, indx, 1)) != 0)
363                                 return (ret);
364                         if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0)
365                                 return (ret);
366
367                         ++indx;
368                 }
369                 break;
370         case DB_CURRENT:
371                  /*
372                   * Clear the cursor's deleted flag.  The problem is that if
373                   * we deadlock or fail while deleting the overflow item or
374                   * replacing the non-overflow item, a subsequent cursor close
375                   * will try and remove the item because the cursor's delete
376                   * flag is set.
377                   */
378                 if ((ret = __bam_ca_delete(dbp, PGNO(h), indx, 0, NULL)) != 0)
379                         return (ret);
380
381                 if (TYPE(h) == P_LBTREE)
382                         ++indx;
383                 bk = GET_BKEYDATA(dbp, h, indx);
384
385                 /*
386                  * In a Btree deleted records aren't counted (deleted records
387                  * are counted in a Recno because all accesses are based on
388                  * record number).  If it's a Btree and it's a DB_CURRENT
389                  * operation overwriting a previously deleted record, increment
390                  * the record count.
391                  */
392                 if (TYPE(h) == P_LBTREE || TYPE(h) == P_LDUP)
393                         was_deleted = B_DISSET(bk->type);
394
395                 /*
396                  * 4. Delete and re-add the data item.
397                  *
398                  * If we're changing the type of the on-page structure, or we
399                  * are referencing offpage items, we have to delete and then
400                  * re-add the item.  We do not do any cursor adjustments here
401                  * because we're going to immediately re-add the item into the
402                  * same slot.
403                  */
404                 if (bigdata || B_TYPE(bk->type) != B_KEYDATA) {
405                         /*
406                          * If streaming, don't delete the overflow item,
407                          * just delete the item pointing to the overflow item.
408                          * It will be added back in later, with the new size.
409                          * We can't simply adjust the size of the item on the
410                          * page, because there is no easy way to log a
411                          * modification.
412                          */
413                         if (F_ISSET(data, DB_DBT_STREAMING)) {
414                                 if ((ret = __db_ditem(
415                                     dbc, h, indx, BOVERFLOW_SIZE)) != 0)
416                                         return (ret);
417                         } else if ((ret = __bam_ditem(dbc, h, indx)) != 0)
418                                 return (ret);
419                         del = 1;
420                         break;
421                 }
422
423                 /* 5. Overwrite the data item. */
424                 replace = 1;
425                 break;
426         default:
427                 return (__db_unknown_flag(env, "DB->put", op));
428         }
429
430         /* Add the data. */
431         if (bigdata) {
432                 /*
433                  * We do not have to handle deleted (BI_DELETED) records
434                  * in this case; the actual records should never be created.
435                  */
436                 DB_ASSERT(env, !LF_ISSET(BI_DELETED));
437                 ret = __bam_ovput(dbc,
438                     B_OVERFLOW, PGNO_INVALID, h, indx, data);
439         } else {
440                 if (LF_ISSET(BI_DELETED)) {
441                         B_TSET_DELETED(bk_tmp.type, B_KEYDATA);
442                         bk_tmp.len = data->size;
443                         bk_hdr.data = &bk_tmp;
444                         bk_hdr.size = SSZA(BKEYDATA, data);
445                         ret = __db_pitem(dbc, h, indx,
446                             BKEYDATA_SIZE(data->size), &bk_hdr, data);
447                 } else if (replace)
448                         ret = __bam_ritem(dbc, h, indx, data, 0);
449                 else
450                         ret = __db_pitem(dbc, h, indx,
451                             BKEYDATA_SIZE(data->size), NULL, data);
452         }
453         if (ret != 0) {
454                 if (del == 1 && (t_ret =
455                      __bam_ca_di(dbc, PGNO(h), indx + 1, -1)) != 0) {
456                         __db_err(env, t_ret,
457                             "cursor adjustment after delete failed");
458                         return (__env_panic(env, t_ret));
459                 }
460                 return (ret);
461         }
462
463         /*
464          * Re-position the cursors if necessary and reset the current cursor
465          * to point to the new item.
466          */
467         if (op != DB_CURRENT) {
468                 if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0)
469                         return (ret);
470                 cp->indx = TYPE(h) == P_LBTREE ? indx - O_INDX : indx;
471         }
472
473         /*
474          * If we've changed the record count, update the tree.  There's no
475          * need to adjust the count if the operation not performed on the
476          * current record or when the current record was previously deleted.
477          */
478         if (F_ISSET(cp, C_RECNUM) && (op != DB_CURRENT || was_deleted))
479                 if ((ret = __bam_adjust(dbc, 1)) != 0)
480                         return (ret);
481
482         /*
483          * If a Btree leaf page is at least 50% full and we may have added or
484          * modified a duplicate data item, see if the set of duplicates takes
485          * up at least 25% of the space on the page.  If it does, move it onto
486          * its own page.
487          */
488         if (dupadjust &&
489             (ret = __bam_dup_convert(dbc, h, indx - O_INDX, cnt)) != 0)
490                 return (ret);
491
492         /* If we've modified a recno file, set the flag. */
493         if (dbc->dbtype == DB_RECNO)
494                 t->re_modified = 1;
495
496         return (ret);
497 }
498
499 /*
500  * __bam_partsize --
501  *      Figure out how much space a partial data item is in total.
502  */
503 static u_int32_t
504 __bam_partsize(dbp, op, data, h, indx)
505         DB *dbp;
506         u_int32_t op, indx;
507         DBT *data;
508         PAGE *h;
509 {
510         BKEYDATA *bk;
511         u_int32_t nbytes;
512
513         /*
514          * If the record doesn't already exist, it's simply the data we're
515          * provided.
516          */
517         if (op != DB_CURRENT)
518                 return (data->doff + data->size);
519
520         /*
521          * Otherwise, it's the data provided plus any already existing data
522          * that we're not replacing.
523          */
524         bk = GET_BKEYDATA(dbp, h, indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
525         nbytes =
526             B_TYPE(bk->type) == B_OVERFLOW ? ((BOVERFLOW *)bk)->tlen : bk->len;
527
528         return (__db_partsize(nbytes, data));
529 }
530
531 /*
532  * __bam_build --
533  *      Build the real record for a partial put, or short fixed-length record.
534  */
535 static int
536 __bam_build(dbc, op, dbt, h, indx, nbytes)
537         DBC *dbc;
538         u_int32_t op, indx, nbytes;
539         DBT *dbt;
540         PAGE *h;
541 {
542         BKEYDATA *bk, tbk;
543         BOVERFLOW *bo;
544         BTREE *t;
545         DB *dbp;
546         DBT copy, *rdata;
547         u_int32_t len, tlen;
548         u_int8_t *p;
549         int ret;
550
551         COMPQUIET(bo, NULL);
552
553         dbp = dbc->dbp;
554         t = dbp->bt_internal;
555
556         /* We use the record data return memory, it's only a short-term use. */
557         rdata = &dbc->my_rdata;
558         if (rdata->ulen < nbytes) {
559                 if ((ret = __os_realloc(dbp->env,
560                     nbytes, &rdata->data)) != 0) {
561                         rdata->ulen = 0;
562                         rdata->data = NULL;
563                         return (ret);
564                 }
565                 rdata->ulen = nbytes;
566         }
567
568         /*
569          * We use nul or pad bytes for any part of the record that isn't
570          * specified; get it over with.
571          */
572         memset(rdata->data,
573            F_ISSET(dbp, DB_AM_FIXEDLEN) ? t->re_pad : 0, nbytes);
574
575         /*
576          * In the next clauses, we need to do three things: a) set p to point
577          * to the place at which to copy the user's data, b) set tlen to the
578          * total length of the record, not including the bytes contributed by
579          * the user, and c) copy any valid data from an existing record.  If
580          * it's not a partial put (this code is called for both partial puts
581          * and fixed-length record padding) or it's a new key, we can cut to
582          * the chase.
583          */
584         if (!F_ISSET(dbt, DB_DBT_PARTIAL) || op != DB_CURRENT) {
585                 p = (u_int8_t *)rdata->data + dbt->doff;
586                 tlen = dbt->doff;
587                 goto user_copy;
588         }
589
590         /* Find the current record. */
591         if (indx < NUM_ENT(h)) {
592                 bk = GET_BKEYDATA(dbp, h, indx + (TYPE(h) == P_LBTREE ?
593                     O_INDX : 0));
594                 bo = (BOVERFLOW *)bk;
595         } else {
596                 bk = &tbk;
597                 B_TSET(bk->type, B_KEYDATA);
598                 bk->len = 0;
599         }
600         if (B_TYPE(bk->type) == B_OVERFLOW) {
601                 /*
602                  * In the case of an overflow record, we shift things around
603                  * in the current record rather than allocate a separate copy.
604                  */
605                 memset(&copy, 0, sizeof(copy));
606                 if ((ret = __db_goff(dbc, &copy, bo->tlen, bo->pgno,
607                     &rdata->data, &rdata->ulen)) != 0)
608                         return (ret);
609
610                 /* Skip any leading data from the original record. */
611                 tlen = dbt->doff;
612                 p = (u_int8_t *)rdata->data + dbt->doff;
613
614                 /*
615                  * Copy in any trailing data from the original record.
616                  *
617                  * If the original record was larger than the original offset
618                  * plus the bytes being deleted, there is trailing data in the
619                  * original record we need to preserve.  If we aren't deleting
620                  * the same number of bytes as we're inserting, copy it up or
621                  * down, into place.
622                  *
623                  * Use memmove(), the regions may overlap.
624                  */
625                 if (bo->tlen > dbt->doff + dbt->dlen) {
626                         len = bo->tlen - (dbt->doff + dbt->dlen);
627                         if (dbt->dlen != dbt->size)
628                                 memmove(p + dbt->size, p + dbt->dlen, len);
629                         tlen += len;
630                 }
631         } else {
632                 /* Copy in any leading data from the original record. */
633                 memcpy(rdata->data,
634                     bk->data, dbt->doff > bk->len ? bk->len : dbt->doff);
635                 tlen = dbt->doff;
636                 p = (u_int8_t *)rdata->data + dbt->doff;
637
638                 /* Copy in any trailing data from the original record. */
639                 len = dbt->doff + dbt->dlen;
640                 if (bk->len > len) {
641                         memcpy(p + dbt->size, bk->data + len, bk->len - len);
642                         tlen += bk->len - len;
643                 }
644         }
645
646 user_copy:
647         /*
648          * Copy in the application provided data -- p and tlen must have been
649          * initialized above.
650          */
651         memcpy(p, dbt->data, dbt->size);
652         tlen += dbt->size;
653
654         /* Set the DBT to reference our new record. */
655         rdata->size = F_ISSET(dbp, DB_AM_FIXEDLEN) ? t->re_len : tlen;
656         rdata->dlen = 0;
657         rdata->doff = 0;
658         rdata->flags = 0;
659         *dbt = *rdata;
660         return (0);
661 }
662
663 /*
664  * __bam_ritem --
665  *      Replace an item on a page.
666  *
667  * PUBLIC: int __bam_ritem __P((DBC *, PAGE *, u_int32_t, DBT *, u_int32_t));
668  */
669 int
670 __bam_ritem(dbc, h, indx, data, typeflag)
671         DBC *dbc;
672         PAGE *h;
673         u_int32_t indx;
674         DBT *data;
675         u_int32_t typeflag;
676 {
677         BKEYDATA *bk;
678         BINTERNAL *bi;
679         DB *dbp;
680         DBT orig, repl;
681         db_indx_t cnt, lo, ln, min, off, prefix, suffix;
682         int32_t nbytes;
683         u_int32_t len;
684         int ret;
685         db_indx_t *inp;
686         u_int8_t *dp, *p, *t, type;
687
688         dbp = dbc->dbp;
689         bi = NULL;
690         bk = NULL;
691
692         /*
693          * Replace a single item onto a page.  The logic figuring out where
694          * to insert and whether it fits is handled in the caller.  All we do
695          * here is manage the page shuffling.
696          */
697         if (TYPE(h) == P_IBTREE) {
698                 /* Point at the part of the internal struct past the type. */
699                 bi = GET_BINTERNAL(dbp, h, indx);
700                 if (B_TYPE(bi->type) == B_OVERFLOW)
701                         len = BOVERFLOW_SIZE;
702                 else
703                         len = bi->len;
704                 len += SSZA(BINTERNAL, data) - SSZ(BINTERNAL, unused);
705                 dp = &bi->unused;
706                 type = typeflag == 0 ? bi->type :
707                         (bi->type == B_KEYDATA ? B_OVERFLOW : B_KEYDATA);
708         } else {
709                 bk = GET_BKEYDATA(dbp, h, indx);
710                 len = bk->len;
711                 dp = bk->data;
712                 type = bk->type;
713                 typeflag = B_DISSET(type);
714         }
715
716         /* Log the change. */
717         if (DBC_LOGGING(dbc)) {
718                 /*
719                  * We might as well check to see if the two data items share
720                  * a common prefix and suffix -- it can save us a lot of log
721                  * message if they're large.
722                  */
723                 min = data->size < len ? data->size : len;
724                 for (prefix = 0,
725                     p = dp, t = data->data;
726                     prefix < min && *p == *t; ++prefix, ++p, ++t)
727                         ;
728
729                 min -= prefix;
730                 for (suffix = 0,
731                     p = (u_int8_t *)dp + len - 1,
732                     t = (u_int8_t *)data->data + data->size - 1;
733                     suffix < min && *p == *t; ++suffix, --p, --t)
734                         ;
735
736                 /* We only log the parts of the keys that have changed. */
737                 orig.data = (u_int8_t *)dp + prefix;
738                 orig.size = len - (prefix + suffix);
739                 repl.data = (u_int8_t *)data->data + prefix;
740                 repl.size = data->size - (prefix + suffix);
741                 if ((ret = __bam_repl_log(dbp, dbc->txn, &LSN(h), 0, PGNO(h),
742                     &LSN(h), (u_int32_t)indx, typeflag,
743                     &orig, &repl, (u_int32_t)prefix, (u_int32_t)suffix)) != 0)
744                         return (ret);
745         } else
746                 LSN_NOT_LOGGED(LSN(h));
747
748         /*
749          * Set references to the first in-use byte on the page and the
750          * first byte of the item being replaced.
751          */
752         inp = P_INP(dbp, h);
753         p = (u_int8_t *)h + HOFFSET(h);
754         if (TYPE(h) == P_IBTREE) {
755                 t = (u_int8_t *)bi;
756                 lo = (db_indx_t)BINTERNAL_SIZE(bi->len);
757                 ln = (db_indx_t)BINTERNAL_SIZE(data->size -
758                     (SSZA(BINTERNAL, data) - SSZ(BINTERNAL, unused)));
759         } else {
760                 t = (u_int8_t *)bk;
761                 lo = (db_indx_t)BKEYDATA_SIZE(bk->len);
762                 ln = (db_indx_t)BKEYDATA_SIZE(data->size);
763         }
764
765         /*
766          * If the entry is growing in size, shift the beginning of the data
767          * part of the page down.  If the entry is shrinking in size, shift
768          * the beginning of the data part of the page up.  Use memmove(3),
769          * the regions overlap.
770          */
771         if (lo != ln) {
772                 nbytes = lo - ln;               /* Signed difference. */
773                 if (p == t)                     /* First index is fast. */
774                         inp[indx] += nbytes;
775                 else {                          /* Else, shift the page. */
776                         memmove(p + nbytes, p, (size_t)(t - p));
777
778                         /* Adjust the indices' offsets. */
779                         off = inp[indx];
780                         for (cnt = 0; cnt < NUM_ENT(h); ++cnt)
781                                 if (inp[cnt] <= off)
782                                         inp[cnt] += nbytes;
783                 }
784
785                 /* Clean up the page and adjust the item's reference. */
786                 HOFFSET(h) += nbytes;
787                 t += nbytes;
788         }
789
790         /* Copy the new item onto the page. */
791         bk = (BKEYDATA *)t;
792         bk->len = data->size;
793         B_TSET(bk->type, type);
794         memcpy(bk->data, data->data, bk->len);
795
796         /* Remove the length of the internal header elements. */
797         if (TYPE(h) == P_IBTREE)
798                 bk->len -= SSZA(BINTERNAL, data) - SSZ(BINTERNAL, unused);
799
800         return (0);
801 }
802
803 /*
804  * __bam_irep --
805  *      Replace an item on an internal page.
806  *
807  * PUBLIC: int __bam_irep __P((DBC *, PAGE *, u_int32_t, DBT *, DBT *));
808  */
809 int
810 __bam_irep(dbc, h, indx, hdr, data)
811         DBC *dbc;
812         PAGE *h;
813         u_int32_t indx;
814         DBT *hdr;
815         DBT *data;
816 {
817         BINTERNAL *bi, *bn;
818         DB *dbp;
819         DBT dbt;
820         int ret;
821
822         dbp = dbc->dbp;
823
824         bi = GET_BINTERNAL(dbp, h, indx);
825         bn = (BINTERNAL *) hdr->data;
826
827         if (B_TYPE(bi->type) == B_OVERFLOW &&
828             (ret = __db_doff(dbc, ((BOVERFLOW *)bi->data)->pgno)) != 0)
829                 return (ret);
830
831         memset(&dbt, 0, sizeof(dbt));
832         dbt.size = hdr->size + data->size - SSZ(BINTERNAL, unused);
833         if ((ret = __os_malloc(dbp->env, dbt.size, &dbt.data)) != 0)
834                 return (ret);
835         memcpy(dbt.data,
836              (u_int8_t *)hdr->data + SSZ(BINTERNAL, unused),
837              hdr->size - SSZ(BINTERNAL, unused));
838         memcpy((u_int8_t *)dbt.data +
839             hdr->size - SSZ(BINTERNAL, unused), data->data, data->size);
840
841         ret = __bam_ritem(dbc, h, indx, &dbt, bi->type != bn->type);
842
843         __os_free(dbp->env, dbt.data);
844         return (ret);
845 }
846
847 /*
848  * __bam_dup_check --
849  *      Check to see if the duplicate set at indx should have its own page.
850  */
851 static int
852 __bam_dup_check(dbc, op, h, indx, sz, cntp)
853         DBC *dbc;
854         u_int32_t op;
855         PAGE *h;
856         u_int32_t indx, sz;
857         db_indx_t *cntp;
858 {
859         BKEYDATA *bk;
860         DB *dbp;
861         db_indx_t cnt, first, *inp;
862
863         dbp = dbc->dbp;
864         inp = P_INP(dbp, h);
865
866         /*
867          * Count the duplicate records and calculate how much room they're
868          * using on the page.
869          */
870         while (indx > 0 && inp[indx] == inp[indx - P_INDX])
871                 indx -= P_INDX;
872
873         /* Count the key once. */
874         bk = GET_BKEYDATA(dbp, h, indx);
875         sz += B_TYPE(bk->type) == B_KEYDATA ?
876             BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE;
877
878         /* Sum up all the data items. */
879         first = indx;
880
881         /*
882          * Account for the record being inserted.  If we are replacing it,
883          * don't count it twice.
884          *
885          * We execute the loop with first == indx to get the size of the
886          * first record.
887          */
888         cnt = op == DB_CURRENT ? 0 : 1;
889         for (first = indx;
890             indx < NUM_ENT(h) && inp[first] == inp[indx];
891             ++cnt, indx += P_INDX) {
892                 bk = GET_BKEYDATA(dbp, h, indx + O_INDX);
893                 sz += B_TYPE(bk->type) == B_KEYDATA ?
894                     BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE;
895         }
896
897         /*
898          * We have to do these checks when the user is replacing the cursor's
899          * data item -- if the application replaces a duplicate item with a
900          * larger data item, it can increase the amount of space used by the
901          * duplicates, requiring this check.  But that means we may have done
902          * this check when it wasn't a duplicate item after all.
903          */
904         if (cnt == 1)
905                 return (0);
906
907         /*
908          * If this set of duplicates is using more than 25% of the page, move
909          * them off.  The choice of 25% is a WAG, but the value must be small
910          * enough that we can always split a page without putting duplicates
911          * on two different pages.
912          */
913         if (sz < dbp->pgsize / 4)
914                 return (0);
915
916         *cntp = cnt;
917         return (1);
918 }
919
920 /*
921  * __bam_dup_convert --
922  *      Move a set of duplicates off-page and into their own tree.
923  */
924 static int
925 __bam_dup_convert(dbc, h, indx, cnt)
926         DBC *dbc;
927         PAGE *h;
928         u_int32_t indx, cnt;
929 {
930         BKEYDATA *bk;
931         DB *dbp;
932         DBT hdr;
933         DB_LOCK lock;
934         DB_MPOOLFILE *mpf;
935         PAGE *dp;
936         db_indx_t cpindx, dindx, first, *inp;
937         int ret, t_ret;
938
939         dbp = dbc->dbp;
940         mpf = dbp->mpf;
941         inp = P_INP(dbp, h);
942
943         /* Move to the beginning of the dup set. */
944         while (indx > 0 && inp[indx] == inp[indx - P_INDX])
945                 indx -= P_INDX;
946
947         /* Get a new page. */
948         if ((ret = __db_new(dbc,
949             dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, &lock, &dp)) != 0)
950                 return (ret);
951         P_INIT(dp, dbp->pgsize, dp->pgno,
952             PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp));
953
954         /*
955          * Move this set of duplicates off the page.  First points to the first
956          * key of the first duplicate key/data pair, cnt is the number of pairs
957          * we're dealing with.
958          */
959         memset(&hdr, 0, sizeof(hdr));
960         first = indx;
961         dindx = indx;
962         cpindx = 0;
963         do {
964                 /* Move cursors referencing the old entry to the new entry. */
965                 if ((ret = __bam_ca_dup(dbc, first,
966                     PGNO(h), indx, PGNO(dp), cpindx)) != 0)
967                         goto err;
968
969                 /*
970                  * Copy the entry to the new page.  If the off-duplicate page
971                  * If the off-duplicate page is a Btree page (i.e. dup_compare
972                  * will be non-NULL, we use Btree pages for sorted dups,
973                  * and Recno pages for unsorted dups), move all entries
974                  * normally, even deleted ones.  If it's a Recno page,
975                  * deleted entries are discarded (if the deleted entry is
976                  * overflow, then free up those pages).
977                  */
978                 bk = GET_BKEYDATA(dbp, h, dindx + 1);
979                 hdr.data = bk;
980                 hdr.size = B_TYPE(bk->type) == B_KEYDATA ?
981                     BKEYDATA_SIZE(bk->len) : BOVERFLOW_SIZE;
982                 if (dbp->dup_compare == NULL && B_DISSET(bk->type)) {
983                         /*
984                          * Unsorted dups, i.e. recno page, and we have
985                          * a deleted entry, don't move it, but if it was
986                          * an overflow entry, we need to free those pages.
987                          */
988                         if (B_TYPE(bk->type) == B_OVERFLOW &&
989                             (ret = __db_doff(dbc,
990                             (GET_BOVERFLOW(dbp, h, dindx + 1))->pgno)) != 0)
991                                 goto err;
992                 } else {
993                         if ((ret = __db_pitem(
994                             dbc, dp, cpindx, hdr.size, &hdr, NULL)) != 0)
995                                 goto err;
996                         ++cpindx;
997                 }
998                 /* Delete all but the last reference to the key. */
999                 if (cnt != 1) {
1000                         if ((ret = __bam_adjindx(dbc,
1001                             h, dindx, first + 1, 0)) != 0)
1002                                 goto err;
1003                 } else
1004                         dindx++;
1005
1006                 /* Delete the data item. */
1007                 if ((ret = __db_ditem(dbc, h, dindx, hdr.size)) != 0)
1008                         goto err;
1009                 indx += P_INDX;
1010         } while (--cnt);
1011
1012         /* Put in a new data item that points to the duplicates page. */
1013         if ((ret = __bam_ovput(dbc,
1014             B_DUPLICATE, dp->pgno, h, first + 1, NULL)) != 0)
1015                 goto err;
1016
1017         /* Adjust cursors for all the above movements. */
1018         ret = __bam_ca_di(dbc,
1019             PGNO(h), first + P_INDX, (int)(first + P_INDX - indx));
1020
1021 err:    if ((t_ret = __memp_fput(mpf,
1022              dbc->thread_info, dp, dbc->priority)) != 0 && ret == 0)
1023                 ret = t_ret;
1024
1025         (void)__TLPUT(dbc, lock);
1026         return (ret);
1027 }
1028
1029 /*
1030  * __bam_ovput --
1031  *      Build an item for an off-page duplicates page or overflow page and
1032  *      insert it on the page.
1033  */
1034 static int
1035 __bam_ovput(dbc, type, pgno, h, indx, item)
1036         DBC *dbc;
1037         u_int32_t type, indx;
1038         db_pgno_t pgno;
1039         PAGE *h;
1040         DBT *item;
1041 {
1042         BOVERFLOW bo;
1043         DBT hdr;
1044         int ret;
1045
1046         UMRW_SET(bo.unused1);
1047         B_TSET(bo.type, type);
1048         UMRW_SET(bo.unused2);
1049
1050         /*
1051          * If we're creating an overflow item, do so and acquire the page
1052          * number for it.  If we're creating an off-page duplicates tree,
1053          * we are giving the page number as an argument.
1054          */
1055         if (type == B_OVERFLOW) {
1056                 if ((ret = __db_poff(dbc, item, &bo.pgno)) != 0)
1057                         return (ret);
1058                 bo.tlen = item->size;
1059         } else {
1060                 bo.pgno = pgno;
1061                 bo.tlen = 0;
1062         }
1063
1064         /* Store the new record on the page. */
1065         memset(&hdr, 0, sizeof(hdr));
1066         hdr.data = &bo;
1067         hdr.size = BOVERFLOW_SIZE;
1068         return (__db_pitem(dbc, h, indx, BOVERFLOW_SIZE, &hdr, NULL));
1069 }