Imported Upstream version 5.3.21
[platform/upstream/libdb.git] / lang / cxx / stl / dbstl_dbc.h
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2009, 2012 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8
9 #ifndef _DB_STL_DBC_H
10 #define _DB_STL_DBC_H
11
12 #include <errno.h>
13
14 #include <set>
15
16 #include "dbstl_common.h"
17 #include "dbstl_dbt.h"
18 #include "dbstl_exception.h"
19 #include "dbstl_container.h"
20 #include "dbstl_resource_manager.h"
21
22 START_NS(dbstl)
23
24 // Forward declarations.
25 class db_container;
26 class DbCursorBase;
27 template<Typename data_dt>
28 class RandDbCursor;
29 class DbstlMultipleKeyDataIterator;
30 class DbstlMultipleRecnoDataIterator;
31 using std::set;
32
33 /////////////////////////////////////////////////////////////////////
34 /////////////////////////////////////////////////////////////////////
35 //
36 // LazyDupCursor class template definition.
37 //
38 // This class allows us to make a shallow copy on construction. When the
39 // cursor pointer is first dereferenced a deep copy is made.
40 //
41 // The allowed type for BaseType is DbCursor<> and RandDbCursor<>
42 // The expected usage of this class is:
43 // 1. Create an iterator in container::begin(), the iterator::pcsr.csr_ptr_
44 // points to an object, thus no need to duplicate.
45 // 2. The iterator is created with default argument, thus the
46 // iterator::pcsr.csr_ptr_ and dup_src_ is NULL, and this iterator is
47 // copied using copy constructor for may be many times, but until the
48 // cursor is really used, no cursor is duplicated.
49 //
50 // There is an informing mechanism between an instance of this class and
51 // its dup_src_ cursor: when that cursor is about to change state, it will
52 // inform all registered LazyDupCursor "listeners" of the change, so that
53 // they will duplicate from the cursor before the change, because that
54 // is the expected cursor state for the listeners.
55
56 template <Typename BaseType>
57 class LazyDupCursor
58 {
59         // dup_src_ is used by this class internally to duplicate another
60         // cursor and set to csr_ptr_, and it is assigned in the copy
61         // constructor from another LazyDupCursor object's csr_ptr_; csr_ptr_
62         // is the acutual pointer that is used to perform cursor operations.
63         //
64         BaseType *csr_ptr_, *dup_src_;
65         typedef LazyDupCursor<BaseType> self;
66
67 public:
68         ////////////////////////////////////////////////////////////////////
69         //
70         // Begin public constructors and destructor.
71         //
72         inline LazyDupCursor()
73         {
74                 csr_ptr_ = NULL;
75                 dup_src_ = NULL;
76         }
77
78         // Used in all iterator types' constructors, dbcptr is created
79         // solely for this object, and the cursor is not yet opened, so we
80         // simply assign it to csr_ptr_.
81         explicit inline LazyDupCursor(BaseType *dbcptr)
82         {
83                 csr_ptr_ = dbcptr;
84                 // Already have pointer, do not need to duplicate.
85                 dup_src_ = NULL;
86         }
87
88         // Do not copy to csr_ptr_, shallow copy from dp2.csr_ptr_.
89         LazyDupCursor(const self& dp2)
90         {
91                 csr_ptr_ = NULL;
92                 if (dp2.csr_ptr_)
93                         dup_src_ = dp2.csr_ptr_;
94                 else
95                         dup_src_ = dp2.dup_src_;
96                 if (dup_src_)
97                         dup_src_->add_dupper(this);
98         }
99
100         ~LazyDupCursor()
101         {
102                 // Not duplicated yet, remove from dup_src_.
103                 if (csr_ptr_ == NULL && dup_src_ != NULL)
104                         dup_src_->erase_dupper(this);
105                 if (csr_ptr_)
106                         delete csr_ptr_;// Delete the cursor.
107         }
108
109         ////////////////////////////////////////////////////////////////////
110
111         // Deep copy.
112         inline const self& operator=(const self &dp2)
113         {
114                 BaseType *dcb;
115
116                 dcb = dp2.csr_ptr_ ? dp2.csr_ptr_ : dp2.dup_src_;
117                 this->operator=(dcb);
118
119                 return dp2;
120         }
121
122         // Deep copy.
123         inline BaseType *operator=(BaseType *dcb)
124         {
125
126                 if (csr_ptr_) {
127                         // Only dup_src_ will inform this, not csr_ptr_.
128                         delete csr_ptr_;
129                         csr_ptr_ = NULL;
130                 }
131
132                 if (dcb)
133                         csr_ptr_ = new BaseType(*dcb);
134                 if (dup_src_ != NULL) {
135                         dup_src_->erase_dupper(this);
136                         dup_src_ = NULL;
137                 }
138
139                 return dcb;
140         }
141
142         void set_cursor(BaseType *dbc)
143         {
144                 assert(dbc != NULL);
145                 if (csr_ptr_) {
146                         // Only dup_src_ will inform this, not csr_ptr_.
147                         delete csr_ptr_;
148                         csr_ptr_ = NULL;
149                 }
150
151                 csr_ptr_ = dbc;
152                 if (dup_src_ != NULL) {
153                         dup_src_->erase_dupper(this);
154                         dup_src_ = NULL;
155                 }
156         }
157
158         // If dup_src_ is informing this object, pass false parameter.
159         inline BaseType* duplicate(bool erase_dupper = true)
160         {
161                 assert(dup_src_ != NULL);
162                 if (csr_ptr_) {
163                         // Only dup_src_ will inform this, not csr_ptr_.
164                         delete csr_ptr_;
165                         csr_ptr_ = NULL;
166                 }
167                 csr_ptr_ = new BaseType(*dup_src_);
168                 if (erase_dupper)
169                         dup_src_->erase_dupper(this);
170                 dup_src_ = NULL;
171                 return csr_ptr_;
172         }
173
174         inline BaseType* operator->()
175         {
176                 if (csr_ptr_)
177                         return csr_ptr_;
178
179                 return duplicate();
180         }
181
182         inline operator bool()
183         {
184                 return csr_ptr_ != NULL;
185         }
186
187         inline bool operator!()
188         {
189                 return !csr_ptr_;
190         }
191
192         inline bool operator==(void *p)
193         {
194                 return csr_ptr_ == p;
195         }
196
197         inline BaseType* base_ptr(){
198                 if (csr_ptr_)
199                         return csr_ptr_;
200                 return duplicate();
201         }
202 };
203
204
205 /////////////////////////////////////////////////////////////////////////
206 /////////////////////////////////////////////////////////////////////////
207 //
208 // DbCursorBase class definition.
209 //
210 // DbCursorBase is the base class for DbCursor<> class template, this class
211 // wraps the Berkeley DB cursor, in order for the ResourceManager to close
212 // the Berkeley DB cursor and set the pointer to null. 
213 // If we don't set the cursor to NULL, the handle could become valid again,
214 // since Berkeley DB recycles handles. DB STL would then try to use the same
215 // handle across different instances, which is not supported.
216 //
217 // In ResourceManager, whenver a cursor is opened, it stores the
218 // DbCursorBase* pointer, so that when need to close the cursor, it calls
219 // DbCursorBase::close() function.
220 //
221 class DbCursorBase
222 {
223 protected:
224         Dbc *csr_;
225         DbTxn *owner_txn_;
226         Db *owner_db_;
227         int csr_status_;
228
229 public:
230         enum DbcGetSkipOptions{SKIP_KEY, SKIP_DATA, SKIP_NONE};
231         inline DbTxn *get_owner_txn() const { return owner_txn_;}
232         inline void set_owner_txn(DbTxn *otxn) { owner_txn_ = otxn;}
233
234         inline Db *get_owner_db() const { return owner_db_;}
235         inline void set_owner_db(Db *odb) { owner_db_ = odb;}
236
237         inline Dbc *get_cursor()  const { return csr_;}
238         inline Dbc *&get_cursor_reference() { return csr_;}
239         inline void set_cursor(Dbc*csr1)
240         {
241                 if (csr_)
242                         ResourceManager::instance()->remove_cursor(this);
243                 csr_ = csr1;
244         }
245
246         inline int close()
247         {
248                 int ret = 0;
249
250                 if (csr_ != NULL && (((DBC *)csr_)->flags & DBC_ACTIVE) != 0) {
251                         ret = csr_->close();
252                         csr_ = NULL;
253                 }
254                 return ret;
255         }
256
257         DbCursorBase(){
258                 owner_txn_ = NULL;
259                 owner_db_ = NULL;
260                 csr_ = NULL;
261                 csr_status_ = 0;
262         }
263
264         DbCursorBase(const DbCursorBase &csrbase)
265         {
266                 this->operator=(csrbase);
267         }
268
269         const DbCursorBase &operator=(const DbCursorBase &csrbase)
270         {
271                 owner_txn_ = csrbase.owner_txn_;
272                 owner_db_ = csrbase.owner_db_;
273                 csr_ = NULL; // Need to call DbCursor<>::dup to duplicate.
274                 csr_status_ = 0;
275                 return csrbase;
276         }
277
278         virtual ~DbCursorBase()
279         {
280                 close();
281         }
282 }; // DbCursorBase
283
284 ////////////////////////////////////////////////////////////////////////
285 ///////////////////////////////////////////////////////////////////////
286 //
287 // DbCursor class template definition
288 //
289 // DbCursor is the connection between Berkeley DB and dbstl container classes
290 // it is the wrapper class for Dbc* cursor of Berkeley Db, to be used for
291 // iterator classes of Berkeley DB backed STL container classes.
292 // Requirement:
293 // 1. Deep copy using Dbc->dup.
294 // 2. Dbc*cursor management via ResourceManager class.
295 // 3. Provide methods to do increment, decrement and advance operations,
296 //    advance is only available for random access iterator from DB_RECNO
297 //    containers.
298 //
299
300 template<typename key_dt, typename data_dt>
301 class DbCursor : public DbCursorBase{
302 protected:
303         // Lazy duplication support: store the LazyDupCursor objects which
304         // will duplicate from this cursor.
305         typedef LazyDupCursor<DbCursor<key_dt, data_dt> > dupper_t;
306         typedef LazyDupCursor<RandDbCursor<data_dt> > dupperr_t;
307         typedef set<LazyDupCursor<DbCursor<key_dt, data_dt> >* > dupset_t;
308         typedef set<LazyDupCursor<RandDbCursor<data_dt> >* > dupsetr_t;
309
310         set<LazyDupCursor<DbCursor<key_dt, data_dt> >* > sduppers1_;
311         set<LazyDupCursor<RandDbCursor<data_dt> >* > sduppers2_;
312
313         // We must use DB_DBT_USERMEM for Dbc::get and Db::get if they are
314         // used in multi-threaded application, so we use key_buf_ and
315         // data_buf_ data members for get operations, and initialize them
316         // to use user memory.
317         Dbt key_buf_, data_buf_;
318
319         // Similar to Berkeley DB C++ API's classes, used to iterate through
320         // bulk retrieved key/data pairs.
321         DbstlMultipleKeyDataIterator *multi_itr_;
322         DbstlMultipleRecnoDataIterator *recno_itr_;
323
324         // Whether to use bulk retrieval. If non-zero, do bulk retrieval,
325         // bulk buffer size is this member, otherwise not bulk read.
326         // By default this member is 0.
327         u_int32_t bulk_retrieval_;
328         // Whether to use DB_RMW flag in Dbc::get, by default false.
329         bool rmw_get_;
330
331         // Whether to poll data from cursor's current position on every
332         // get_current_key/data call.
333         // Note that curr_key_/curr_data_ members are always maintained
334         // to contain current k/d value of the pair pointed to by csr_.
335         // If doing bulk retrieval, this flag is ignored, we will always
336         // read data from bulk buffer.
337         bool directdb_get_;
338
339         // Inform LazyDupCursor objects registered in this object to do
340         // duplication because this cursor is to be changed.
341         // This function should be called in any function of
342         // DbCursor and RandDbCursor whenever the cursor is about to change
343         // state(move/close, etc).
344         inline void inform_duppers()
345         {
346                 typename dupset_t::iterator i1;
347                 typename dupsetr_t::iterator i2;
348                 for (i1 = sduppers1_.begin(); i1 != sduppers1_.end(); i1++)
349                         (*i1)->duplicate(false);
350                 for (i2 = sduppers2_.begin(); i2 != sduppers2_.end(); i2++)
351                         (*i2)->duplicate(false);
352                 sduppers1_.clear();
353                 sduppers2_.clear();
354         }
355
356 public:
357         friend class DataItem;
358
359         // Current key/data pair pointed by "csr_" Dbc*cursor. They are both
360         // maintained on cursor movement. If directdb_get_ is true,
361         // they are both refreshed on every get_current{[_key][_data]} call and 
362         // the retrieved key/data pair is returned to user.
363         DataItem curr_key_;
364         DataItem curr_data_;
365
366         typedef DbCursor<key_dt, data_dt> self;
367
368         // This function is used by all iterators to do equals comparison.
369         // Random iterators will also use it to do less than/greater than
370         // comparisons.
371         // Internally, the page number difference or index difference is
372         // returned, so for btree and hash databases, if two cursors point to
373         // the same key/data pair, we will get 0 returned, meaning they are
374         // equal; if return value is not 0, it means no more than that they
375         // they are not equal. We can't assume any order information between
376         // the two cursors. For recno databases, we use the recno to do less
377         // than and greater than comparison. So we can get a reliable knowledge
378         // of the relative position of two iterators from the return value.
379         int compare(const self *csr2) const{
380                 int res, ret;
381
382                 BDBOP(((DBC *)csr_)->cmp((DBC *)csr_, (DBC *)csr2->csr_,
383                     &res, 0), ret);
384                 return res;
385         }
386
387         ////////////////////////////////////////////////////////////////////
388         //
389         // Add and remove cursor change event listeners.
390         //
391         inline void add_dupper(dupper_t *dupper)
392         {
393                 sduppers1_.insert(dupper);
394         }
395
396         inline void add_dupper(dupperr_t *dupper)
397         {
398                 sduppers2_.insert(dupper);
399         }
400
401         inline void erase_dupper(dupper_t *dup1)
402         {
403                 sduppers1_.erase(dup1);
404         }
405
406         inline void erase_dupper(dupperr_t *dup1)
407         {
408                 sduppers2_.erase(dup1);
409         }
410
411         ////////////////////////////////////////////////////////////////////
412
413 public:
414
415         inline bool get_rmw()
416         {
417                 return rmw_get_;
418         }
419
420         bool set_rmw(bool rmw, DB_ENV *env = NULL )
421         {
422                 u_int32_t flag = 0;
423                 DB_ENV *dbenv = NULL;
424                 int ret;
425
426                 if (env)
427                         dbenv = env;
428                 else
429                         dbenv = ((DBC*)csr_)->dbenv;
430                 BDBOP(dbenv->get_open_flags(dbenv, &flag), ret);
431
432                 // DB_RMW flag requires locking subsystem started.
433                 if (rmw && ((flag & DB_INIT_LOCK) || (flag & DB_INIT_CDB) ||
434                     (flag & DB_INIT_TXN)))
435                         rmw_get_ = true;
436                 else
437                         rmw_get_ = false;
438                 return rmw_get_;
439         }
440
441         // Modify bulk buffer size. Bulk read is enabled when creating an
442         // iterator, so users later can only modify the bulk buffer size
443         // to another value, but can't enable/disable bulk read while an
444         // iterator is already alive. 
445         // Returns true if succeeded, false otherwise.
446         inline bool set_bulk_buffer(u_int32_t sz)
447         {
448                 if (bulk_retrieval_ && sz) {
449                         normalize_bulk_bufsize(sz);
450                         bulk_retrieval_ = sz;
451                         return true;
452                 }
453
454                 return false;
455
456         }
457
458         inline u_int32_t get_bulk_bufsize()
459         {
460                 return bulk_retrieval_;
461         }
462
463         inline void enlarge_dbt(Dbt &d, u_int32_t sz)
464         {
465                 void *p;
466
467                 p = DbstlReAlloc(d.get_data(), sz);
468                 dbstl_assert(p != NULL);
469                 d.set_ulen(sz);
470                 d.set_data(p);
471                 d.set_size(sz);
472         }
473         // Move forward or backward, often by 1 key/data pair, we can use
474         // different flags for Dbc::get function. Then update the key/data
475         // pair and csr_status_ members.
476         //
477         int increment(int flag)
478         {
479                 int ret = 0;
480                 Dbt &k = key_buf_, &d = data_buf_;
481                 u_int32_t sz, getflags = 0, bulk_bufsz;
482
483                 if (csr_ == NULL)
484                         return INVALID_ITERATOR_CURSOR;
485                 curr_key_.reset();
486                 curr_data_.reset();
487                 inform_duppers();
488
489                 // Berkeley DB cursor flags are not bitwise set, so we can't
490                 // use bit operations here.
491                 //
492                 if (this->bulk_retrieval_ != 0)
493                         switch (flag) {
494                         case DB_PREV:
495                         case DB_PREV_DUP:
496                         case DB_PREV_NODUP:
497                         case DB_LAST:
498                         case DB_JOIN_ITEM:
499                         case DB_GET_RECNO:
500                         case DB_SET_RECNO: 
501                                 break;
502                         default:
503                                 getflags |= DB_MULTIPLE_KEY;
504                                 if (data_buf_.get_ulen() != bulk_retrieval_)
505                                         enlarge_dbt(data_buf_, bulk_retrieval_);
506                                 break;
507                         }
508
509                 if (this->rmw_get_)
510                         getflags |= DB_RMW;
511
512                 // Do not use BDBOP or BDBOP2 here because it is likely
513                 // that an iteration will step onto end() position.
514 retry:          ret = csr_->get(&k, &d, flag | getflags);
515                 if (ret == 0) {
516                         if (bulk_retrieval_ && (getflags & DB_MULTIPLE_KEY)) {
517                                 // A new retrieval, so both multi_itr_ and
518                                 // recno_itr_ must be NULL.
519                                 if (((DBC*)csr_)->dbtype == DB_RECNO) {
520                                         if (recno_itr_) {
521                                                 delete recno_itr_;
522                                                 recno_itr_ = NULL;
523                                         }
524                                         recno_itr_ =
525                                     new DbstlMultipleRecnoDataIterator(d);
526                                 } else {
527                                         if (multi_itr_) {
528                                                 delete multi_itr_;
529                                                 multi_itr_ = NULL;
530                                         }
531                                         multi_itr_ = new
532                                             DbstlMultipleKeyDataIterator(d);
533                                 }
534                         } else {
535                                 // Non bulk retrieval succeeded.
536                                 curr_key_.set_dbt(k, false);
537                                 curr_data_.set_dbt(d, false);
538                                 limit_buf_size_after_use();
539                         }
540                 } else if (ret == DB_BUFFER_SMALL) {
541                         // Either the key or data DBTs might trigger a
542                         // DB_KEYSMALL return. Only enlarge the DBT if it 
543                         // is actually too small. 
544                         if (((sz = d.get_size()) > 0) && (sz > d.get_ulen()))
545                                 enlarge_dbt(d, sz);
546
547                         if (((sz = k.get_size()) > 0) && (sz > k.get_ulen()))
548                                 enlarge_dbt(k, sz);
549
550                         goto retry;
551                 } else {
552                         if (ret == DB_NOTFOUND) {
553                                 ret = INVALID_ITERATOR_POSITION;
554                                 this->curr_key_.reset();
555                                 this->curr_data_.reset();
556                         } else if (bulk_retrieval_ &&
557                             (getflags & DB_MULTIPLE_KEY)){
558                                 BDBOP(((DBC*)csr_)->dbp->
559                                     get_pagesize(((DBC*)csr_)->
560                                     dbp, &bulk_bufsz), ret);
561                                 if (bulk_bufsz > d.get_ulen()) {// buf size error
562                                         normalize_bulk_bufsize(bulk_bufsz);
563                                         bulk_retrieval_ = bulk_bufsz;
564                                         enlarge_dbt(d, bulk_bufsz);
565                                         goto retry;
566                                 } else
567                                         throw_bdb_exception(
568                                             "DbCursor<>::increment", ret);
569                         } else
570                                 throw_bdb_exception(
571                                     "DbCursor<>::increment", ret);
572                 }
573
574                 csr_status_ = ret;
575                 return ret;
576         }
577
578         // After each use of key_buf_ and data_buf_, limit their buffer size to
579         // a reasonable size so that they don't waste a big memory space.
580         inline void limit_buf_size_after_use() 
581         {
582                 if (bulk_retrieval_)
583                         // Bulk buffer has to be huge, so don't check it.
584                         return;
585
586                 if (key_buf_.get_ulen() > DBSTL_MAX_KEY_BUF_LEN) {
587                         key_buf_.set_data(DbstlReAlloc(key_buf_.get_data(),
588                             DBSTL_MAX_KEY_BUF_LEN));
589                         key_buf_.set_ulen(DBSTL_MAX_KEY_BUF_LEN);
590                 }
591                 if (data_buf_.get_ulen() > DBSTL_MAX_DATA_BUF_LEN) {
592                         data_buf_.set_data(DbstlReAlloc(data_buf_.get_data(),
593                             DBSTL_MAX_DATA_BUF_LEN));
594                         data_buf_.set_ulen(DBSTL_MAX_DATA_BUF_LEN);
595                 }
596         }
597
598         // Duplicate this object's cursor and set it to dbc1.
599         //
600         inline int dup(DbCursor<key_dt, data_dt>& dbc1) const
601         {
602                 Dbc* pcsr = 0;
603                 int ret;
604
605                 if (csr_ != 0 && csr_->dup(&pcsr, DB_POSITION) == 0) {
606                         dbc1.set_cursor(pcsr);
607                         dbc1.set_owner_db(this->get_owner_db());
608                         dbc1.set_owner_txn(this->get_owner_txn());
609                         ResourceManager::instance()->add_cursor(
610                             this->get_owner_db(), &dbc1);
611                         ret = 0;
612                 } else
613                         ret = ITERATOR_DUP_ERROR;
614
615                 return ret;
616         }
617
618 public:
619         // Open a cursor, do not move it, it is at an invalid position.
620         // All cursors should be opened using this method.
621         //
622         inline int open(db_container *pdbc, int flags)
623         {
624                 int ret;
625
626                 Db *pdb = pdbc->get_db_handle();
627                 if (pdb == NULL)
628                         return 0;
629                 if (csr_) // Close before open.
630                         return 0;
631                 ret = ResourceManager::instance()->
632                     open_cursor(this, pdb, flags);
633                 set_rmw(rmw_get_);
634                 this->csr_status_ = ret;
635                 return ret;
636         }
637
638         // Move Berkeley DB cursor to specified key k, by default use DB_SET,
639         // but DB_SET_RANGE can and may also be used.
640         //
641         int move_to(const key_dt&k, u_int32_t flag = DB_SET)
642         {
643                 Dbt &d1 = data_buf_;
644                 int ret;
645                 u_int32_t sz;
646                 DataItem k1(k, true);
647
648                 if (csr_ == NULL)
649                         return INVALID_ITERATOR_CURSOR;
650
651                 curr_key_.reset();
652                 curr_data_.reset();
653                 inform_duppers();
654
655                 // It is likely that k is not in db, causing get(DB_SET) to
656                 // fail, we should not throw an exception because of this.
657                 //
658                 if (rmw_get_)
659                         flag |= DB_RMW;
660 retry:          ret = csr_->get(&k1.get_dbt(), &d1, flag);
661                 if (ret == 0) {
662                         curr_key_ = k1;
663                         curr_data_.set_dbt(d1, false);
664                         limit_buf_size_after_use();
665                 } else if (ret == DB_BUFFER_SMALL) {
666                         sz = d1.get_size();
667                         assert(sz > 0);
668                         enlarge_dbt(d1, sz);
669                         goto retry;
670                 } else {
671                         if (ret == DB_NOTFOUND) {
672                                 ret = INVALID_ITERATOR_POSITION;
673                                 // Invalidate current values because it is
674                                 // at an invalid position.
675                                 this->curr_key_.reset();
676                                 this->curr_data_.reset();
677                         } else
678                                 throw_bdb_exception("DbCursor<>::move_to", ret);
679                 }
680
681                 csr_status_ = ret;
682                 return ret;
683         }
684
685         // Returns the number of keys equal to the current one.
686         inline size_t count()
687         {
688                 int ret;
689                 db_recno_t cnt;
690
691                 BDBOP2(csr_->count(&cnt, 0), ret, close());
692                 return (size_t)cnt;
693         }
694
695         int insert(const key_dt&k, const data_dt& d, int pos = DB_BEFORE)
696         {
697                 // !!!XXX:
698                 //         We do a deep copy of the input data into a local
699                 //         variable. Apparently not doing so causes issues
700                 //         when using gcc. Even though the put completes prior
701                 //         to returning from this function call.
702                 //         It would be best to avoid this additional copy.
703                 int ret;
704                 // (k, d) pair may be a temporary pair, so we must copy them.
705                 DataItem k1(k, false), d1(d, false);
706
707                 inform_duppers();
708                 if (pos == DB_AFTER) {
709                         ret = this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
710                             pos);
711                         // May be using this flag for an empty database,
712                         // because begin() an iterator of an empty db_vector
713                         // equals its end() iterator, so use DB_KEYLAST to
714                         // retry.
715                         //
716                         if (ret == EINVAL || ret == 0)
717                                 return ret;
718                         else if (ret)
719                                 throw_bdb_exception("DbCursor<>::insert", ret);
720                 }
721                 if (pos == DB_NODUPDATA)
722                         BDBOP3(this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
723                             pos), ret, DB_KEYEXIST, close());
724                 else
725                         BDBOP2(this->csr_->put(&k1.get_dbt(), &d1.get_dbt(),
726                             pos), ret, close());
727                 this->csr_status_ = ret;
728                 if (ret == 0) {
729                         curr_key_ = k1;
730                         curr_data_ = d1;
731                 }
732                 // This cursor points to the new key/data pair now.
733                 return ret;
734         }
735
736         // Replace current cursor-pointed data item with d.
737         inline int replace(const data_dt& d)
738         {
739                 Dbt k1;
740                 int ret;
741                 // !!!XXX:
742                 //         We do a deep copy of the input data into a local
743                 //         variable. Apparently not doing so causes issues
744                 //         when using gcc. Even though the put completes prior
745                 //         to returning from this function call.
746                 //         It would be best to avoid this additional copy.
747                 // d may be a temporary object, so we must copy it.
748                 DataItem d1(d, false);
749
750                 
751                 BDBOP2(this->csr_->put(&k1, &d1.get_dbt(), DB_CURRENT),
752                     ret, close());
753                 curr_data_ = d1; // Update current data.
754                 
755                 this->csr_status_ = ret;
756                 return ret;
757         }
758
759         // Remove old key and insert new key-psuodo_data. First insert then
760         // move to old key and remove it so that the cursor remains at the
761         // old key's position, according to DB documentation.
762         // But from practice I can see
763         // the cursor after delete seems not at old position because a for
764         // loop iteration exits prematurelly, not all elements are passed.
765         //
766         inline int replace_key(const key_dt&k)
767         {
768                 data_dt d;
769                 key_dt k0;
770                 int ret;
771
772                 this->get_current_key_data(k0, d);
773                 if (k0 == k)
774                         return 0;
775
776                 DbCursor<key_dt, data_dt> csr2;
777                 this->dup(csr2);
778                 // Delete current, then insert new key/data pair.
779                 ret = csr2.del(); 
780                 ret = csr2.insert(k, d, DB_KEYLAST);
781                 this->csr_status_ = ret;
782                 
783                 // Now this->csr_ is sitting on an invalid position, its 
784                 // iterator is invalidated. Must first move it to the next
785                 // position before using it.
786                 return ret;
787         }
788
789         inline int del()
790         {
791                 int ret;
792
793                 inform_duppers();
794                 BDBOP2(csr_->del(0), ret, close());
795
796                 // By default pos.csr_ will stay at where it was after delete,
797                 // which now is an invalid position. So we need to move to
798                 // next to conform to stl specifications, but we don't move it
799                 // here, iterator::erase should move the iterator itself 
800                 // forward.
801                 //
802                 this->csr_status_ = ret;
803                 return ret;
804         }
805
806         // Make sure the bulk buffer is large enough, and a multiple of 1KB. 
807         // This function may be called prior to cursor initialization, it is 
808         // not possible to verify that the buffer size is a multiple of the 
809         // page size here.
810         u_int32_t normalize_bulk_bufsize(u_int32_t &bulksz)
811         {
812                 if (bulksz == 0)
813                         return 0;
814
815                 while (bulksz < 16 * sizeof(data_dt))
816                         bulksz *= 2;
817
818                 bulksz = bulksz + 1024 - bulksz % 1024;
819
820                 return bulksz;
821         }
822
823         ////////////////////////////////////////////////////////////////////
824         //
825         // Begin public constructors and destructor.
826         //
827         explicit DbCursor(u_int32_t b_bulk_retrieval = 0, bool brmw1 = false,
828             bool directdbget = true) : DbCursorBase(),
829             curr_key_(sizeof(key_dt)), curr_data_(sizeof(data_dt))
830         {
831                 u_int32_t bulksz = sizeof(data_dt); // non-bulk
832                 rmw_get_ = brmw1;
833                 this->bulk_retrieval_ = 
834                     normalize_bulk_bufsize(b_bulk_retrieval);
835                 recno_itr_ = NULL;
836                 multi_itr_ = NULL;
837
838                 if (bulk_retrieval_) {
839                         if (bulksz <= bulk_retrieval_)
840                                 bulksz = bulk_retrieval_;
841                         else {
842                                 normalize_bulk_bufsize(bulksz);
843                                 bulk_retrieval_ = bulksz;
844                         }
845                 }
846                 key_buf_.set_data(DbstlMalloc(sizeof(key_dt)));
847                 key_buf_.set_ulen(sizeof(key_dt));
848                 key_buf_.set_flags(DB_DBT_USERMEM);
849                 data_buf_.set_data(DbstlMalloc(bulksz));
850                 data_buf_.set_ulen(bulksz);
851                 data_buf_.set_flags(DB_DBT_USERMEM);
852                 directdb_get_ = directdbget;
853         }
854
855         // Copy constructor, duplicate cursor here.
856         DbCursor(const DbCursor<key_dt, data_dt>& dbc) :
857             DbCursorBase(dbc),
858             curr_key_(dbc.curr_key_), curr_data_(dbc.curr_data_)
859         {
860                 void *pk, *pd;
861
862                 dbc.dup(*this);
863                 csr_status_ = dbc.csr_status_;
864                 if (csr_ || dbc.csr_)
865                         this->rmw_get_ = set_rmw(dbc.rmw_get_,
866                             ((DBC*)dbc.csr_)->dbenv);
867                 else
868                         rmw_get_ = dbc.rmw_get_;
869
870                 bulk_retrieval_ = dbc.bulk_retrieval_;
871
872                 // Now we have to copy key_buf_ and data_buf_ to support
873                 // multiple retrieval.
874                 key_buf_.set_data(pk = DbstlMalloc(dbc.key_buf_.get_ulen()));
875                 key_buf_.set_ulen(dbc.key_buf_.get_ulen());
876                 key_buf_.set_size(dbc.key_buf_.get_size());
877                 key_buf_.set_flags(DB_DBT_USERMEM);
878                 memcpy(pk, dbc.key_buf_.get_data(), key_buf_.get_ulen());
879
880                 data_buf_.set_data(pd = DbstlMalloc(dbc.data_buf_.get_ulen()));
881                 data_buf_.set_ulen(dbc.data_buf_.get_ulen());
882                 data_buf_.set_size(dbc.data_buf_.get_size());
883                 data_buf_.set_flags(DB_DBT_USERMEM);
884                 memcpy(pd, dbc.data_buf_.get_data(), data_buf_.get_ulen());
885                 if (dbc.recno_itr_) {
886                         recno_itr_ = new DbstlMultipleRecnoDataIterator(
887                             data_buf_);
888                         recno_itr_->set_pointer(dbc.recno_itr_->get_pointer());
889                 } else
890                         recno_itr_ = NULL;
891                 if (dbc.multi_itr_) {
892                         multi_itr_ = new DbstlMultipleKeyDataIterator(
893                             data_buf_);
894                         multi_itr_->set_pointer(dbc.multi_itr_->get_pointer());
895
896                 } else
897                         multi_itr_ = NULL;
898
899                 directdb_get_ = dbc.directdb_get_;
900
901                 // Do not copy sduppers, they are private to each DbCursor<>
902                 // object.
903         }
904
905         virtual ~DbCursor()
906         {
907                 close(); // Call close() ahead of freeing following buffers.
908                 free(key_buf_.get_data());
909                 free(data_buf_.get_data());
910                 if (multi_itr_)
911                         delete multi_itr_;
912                 if (recno_itr_)
913                         delete recno_itr_;
914         }
915
916         ////////////////////////////////////////////////////////////////////
917
918         const DbCursor<key_dt, data_dt>& operator=
919             (const DbCursor<key_dt, data_dt>& dbc)
920         {
921                 void *pk;
922                 u_int32_t ulen;
923
924                 DbCursorBase::operator =(dbc);
925                 dbc.dup(*this);
926                 curr_key_ = dbc.curr_key_;
927                 curr_data_ = dbc.curr_data_;
928                 rmw_get_ = dbc.rmw_get_;
929                 this->bulk_retrieval_ = dbc.bulk_retrieval_;
930                 this->directdb_get_ = dbc.directdb_get_;
931                 // Now we have to copy key_buf_ and data_buf_ to support
932                 // bulk retrieval.
933                 key_buf_.set_data(pk = DbstlReAlloc(key_buf_.get_data(),
934                     ulen = dbc.key_buf_.get_ulen()));
935                 key_buf_.set_ulen(ulen);
936                 key_buf_.set_size(dbc.key_buf_.get_size());
937                 key_buf_.set_flags(DB_DBT_USERMEM);
938                 memcpy(pk, dbc.key_buf_.get_data(), ulen);
939
940                 data_buf_.set_data(pk = DbstlReAlloc(key_buf_.get_data(),
941                     ulen = dbc.key_buf_.get_ulen()));
942                 data_buf_.set_ulen(ulen);
943                 data_buf_.set_size(dbc.data_buf_.get_size());
944                 data_buf_.set_flags(DB_DBT_USERMEM);
945                 memcpy(pk, dbc.key_buf_.get_data(), ulen);
946
947                 if (dbc.recno_itr_) {
948                         if (recno_itr_) {
949                                 delete recno_itr_;
950                                 recno_itr_ = NULL;
951                         }
952                         recno_itr_ = new DbstlMultipleRecnoDataIterator(
953                             data_buf_);
954                         recno_itr_->set_pointer(dbc.recno_itr_->get_pointer());
955                 } else if (recno_itr_) {
956                         delete recno_itr_;
957                         recno_itr_ = NULL;
958                 }
959
960                 if (dbc.multi_itr_) {
961                         if (multi_itr_) {
962                                 delete multi_itr_;
963                                 multi_itr_ = NULL;
964                         }
965                         multi_itr_ = new DbstlMultipleKeyDataIterator(
966                             data_buf_);
967                         multi_itr_->set_pointer(dbc.multi_itr_->get_pointer());
968
969                 } else if (multi_itr_) {
970                         delete multi_itr_;
971                         multi_itr_ = NULL;
972                 }
973
974                 return dbc;
975                 // Do not copy sduppers, they are private to each DbCursor<>
976                 // object.
977
978         }
979
980         // Move Dbc*cursor to next position. If doing bulk read, read from
981         // the bulk buffer. If bulk buffer exhausted, do another bulk read
982         // from database, and then read from the bulk buffer. Quit if no
983         // more data in database.
984         //
985         int next(int flag = DB_NEXT)
986         {
987                 Dbt k, d;
988                 db_recno_t recno;
989                 int ret;
990
991 retry:          if (bulk_retrieval_) {
992                         if (multi_itr_) {
993                                 if (multi_itr_->next(k, d)) {
994                                         curr_key_.set_dbt(k, false);
995                                         curr_data_.set_dbt(d, false);
996                                         return 0;
997                                 } else {
998                                         delete multi_itr_;
999                                         multi_itr_ = NULL;
1000                                 }
1001                         }
1002                         if (recno_itr_) {
1003                                 if (recno_itr_->next(recno, d)) {
1004                                         curr_key_.set_dbt(k, false);
1005                                         curr_data_.set_dbt(d, false);
1006                                         return 0;
1007                                 } else {
1008                                         delete recno_itr_;
1009                                         recno_itr_ = NULL;
1010                                 }
1011                         }
1012                 }
1013                 ret = increment(flag);
1014                 if (bulk_retrieval_ && ret == 0)
1015                         goto retry;
1016                 return ret;
1017         }
1018
1019         inline int prev(int flag = DB_PREV)
1020         {
1021                 return increment(flag);
1022         }
1023
1024         // Move Dbc*cursor to first element. If doing bulk read, read data
1025         // from bulk buffer.
1026         int first()
1027         {
1028                 Dbt k, d;
1029                 db_recno_t recno;
1030                 int ret;
1031
1032                 ret = increment(DB_FIRST);
1033                 if (bulk_retrieval_) {
1034                         if (multi_itr_) {
1035                                 if (multi_itr_->next(k, d)) {
1036                                         curr_key_.set_dbt(k, false);
1037                                         curr_data_.set_dbt(d, false);
1038                                         return 0;
1039                                 } else {
1040                                         delete multi_itr_;
1041                                         multi_itr_ = NULL;
1042                                 }
1043                         }
1044                         if (recno_itr_) {
1045                                 if (recno_itr_->next(recno, d)) {
1046                                         curr_key_.set_dbt(k, false);
1047                                         curr_data_.set_dbt(d, false);
1048                                         return 0;
1049                                 } else {
1050                                         delete recno_itr_;
1051                                         recno_itr_ = NULL;
1052                                 }
1053                         }
1054                 }
1055
1056                 return ret;
1057         }
1058
1059         inline int last()
1060         {
1061                 return increment(DB_LAST);
1062         }
1063
1064         // Get current key/data pair, shallow copy. Return 0 on success,
1065         // -1 if no data.
1066         inline int get_current_key_data(key_dt&k, data_dt&d)
1067         {
1068                 if (directdb_get_)
1069                         update_current_key_data_from_db(
1070                             DbCursorBase::SKIP_NONE);
1071                 if (curr_key_.get_data(k) == 0 && curr_data_.get_data(d) == 0)
1072                         return 0;
1073                 else 
1074                         return INVALID_KEY_DATA;
1075         }
1076
1077         // Get current data, shallow copy. Return 0 on success, -1 if no data.
1078         inline int get_current_data(data_dt&d)
1079         {
1080                 if (directdb_get_)
1081                         update_current_key_data_from_db(DbCursorBase::SKIP_KEY);
1082                 if (curr_data_.get_data(d) == 0)
1083                         return 0;
1084                 else 
1085                         return INVALID_KEY_DATA;
1086         }
1087
1088         // Get current key, shallow copy. Return 0 on success, -1 if no data.
1089         inline int get_current_key(key_dt&k)
1090         {
1091                 if (directdb_get_)
1092                         update_current_key_data_from_db(
1093                             DbCursorBase::SKIP_DATA);
1094                 if (curr_key_.get_data(k) == 0)
1095                         return 0;
1096                 else 
1097                         return INVALID_KEY_DATA;
1098         }
1099
1100         inline void close()
1101         {
1102                 if (csr_) {
1103                         inform_duppers();
1104                         ResourceManager::instance()->remove_cursor(this);
1105                 }
1106                 csr_ = NULL;
1107         }
1108
1109         // Parameter skipkd specifies skip retrieving key or data:
1110         // If 0, don't skip, retrieve both;
1111         // If 1, skip retrieving key;
1112         // If 2, skip retrieving data.
1113         // Do not poll from db again if doing bulk retrieval.
1114         void update_current_key_data_from_db(DbcGetSkipOptions skipkd) {
1115                 int ret;
1116                 u_int32_t sz, sz1, kflags = DB_DBT_USERMEM,
1117                     dflags = DB_DBT_USERMEM;
1118                 // Do not poll from db again if doing bulk retrieval.
1119                 if (this->bulk_retrieval_)
1120                         return;
1121                 if (this->csr_status_ != 0) {
1122                         curr_key_.reset();
1123                         curr_data_.reset();
1124                         return;
1125                 }
1126                 
1127                 // We will modify flags if skip key or data, so cache old
1128                 // value and set it after get calls.
1129                 if (skipkd != DbCursorBase::SKIP_NONE) {
1130                         kflags = key_buf_.get_flags();
1131                         dflags = data_buf_.get_flags();
1132                 }
1133                 if (skipkd == DbCursorBase::SKIP_KEY) {
1134                         key_buf_.set_dlen(0);
1135                         key_buf_.set_flags(DB_DBT_PARTIAL | DB_DBT_USERMEM);
1136                 }
1137
1138                 if (skipkd == DbCursorBase::SKIP_DATA) {
1139                         data_buf_.set_dlen(0);
1140                         data_buf_.set_flags(DB_DBT_PARTIAL | DB_DBT_USERMEM);
1141                 }
1142 retry:          ret = csr_->get(&key_buf_, &data_buf_, DB_CURRENT);
1143                 if (ret == 0) {
1144                         if (skipkd != DbCursorBase::SKIP_KEY)
1145                                 curr_key_ = key_buf_;
1146                         if (skipkd != DbCursorBase::SKIP_DATA)
1147                                 curr_data_ = data_buf_;
1148                         limit_buf_size_after_use();
1149                 } else if (ret == DB_BUFFER_SMALL) {
1150                         if ((sz = key_buf_.get_size()) > 0)
1151                                 enlarge_dbt(key_buf_, sz);
1152                         if ((sz1 = data_buf_.get_size()) > 0) 
1153                                 enlarge_dbt(data_buf_, sz1);
1154                         if (sz == 0 && sz1 == 0)
1155                                 THROW0(InvalidDbtException);
1156                         goto retry;
1157                 } else {
1158                         if (skipkd != DbCursorBase::SKIP_NONE) {
1159                                 key_buf_.set_flags(kflags);
1160                                 data_buf_.set_flags(dflags);
1161                         }
1162                         throw_bdb_exception(
1163                         "DbCursor<>::update_current_key_data_from_db", ret);
1164                 }
1165
1166                 if (skipkd != DbCursorBase::SKIP_NONE) {
1167                         key_buf_.set_flags(kflags);
1168                         data_buf_.set_flags(dflags);
1169                 }
1170         }
1171 }; // DbCursor<>
1172
1173 ////////////////////////////////////////////////////////////////////////
1174 ////////////////////////////////////////////////////////////////////////
1175 //
1176 // RandDbCursor class template definition
1177 //
1178 // RandDbCursor is a random accessible cursor wrapper for use by
1179 // db_vector_iterator, it derives from DbCursor<> class. It has a fixed key
1180 // data type, which is index_type.
1181 //
1182 typedef db_recno_t index_type;
1183 template<Typename data_dt>
1184 class RandDbCursor : public DbCursor<index_type, data_dt>
1185 {
1186 protected:
1187         friend class DataItem;
1188         typedef ssize_t difference_type;
1189 public:
1190         typedef RandDbCursor<data_dt> self;
1191         typedef DbCursor<index_type, data_dt> base;
1192
1193         // Return current csr_ pointed element's index in recno database
1194         // (i.e. the index starting from 1). csr_ must be open and
1195         // point to an existing key/data pair.
1196         //
1197         inline index_type get_current_index() const
1198         {
1199                 index_type ndx;
1200
1201                 if (this->directdb_get_)
1202                         ((self *)this)->update_current_key_data_from_db(
1203                             DbCursorBase::SKIP_DATA);
1204                 this->curr_key_.get_data(ndx);
1205                 return ndx;
1206         }
1207
1208         inline int compare(const self *csr2) const{
1209                 index_type i1, i2;
1210
1211                 i1 = this->get_current_index();
1212                 i2 = csr2->get_current_index();
1213                 return i1 - i2;
1214         }
1215
1216         // Insert data d before/after current position.
1217         int insert(const data_dt& d, int pos = DB_BEFORE){
1218                 int k = 1, ret;
1219                 //data_dt dta;
1220
1221                 // Inserting into empty db, must set key to 1.
1222                 if (pos == DB_KEYLAST)
1223                         k = 1;
1224
1225                 ret = base::insert(k, d, pos);
1226
1227                 // Inserting into a empty db using begin() itr, so flag is
1228                 // DB_AFTER and surely failed, so change to use DB_KEYLAST
1229                 // and try again.
1230                 if (ret == EINVAL) {
1231                         k = 1;
1232                         pos = DB_KEYLAST;
1233                         ret = base::insert(k, d, pos);
1234                 }
1235                 this->csr_status_ = ret;
1236                 return ret;
1237         }
1238
1239         /*
1240          * Move the cursor n positions, if reaches the beginning or end,
1241          * returns DB_NOTFOUND.
1242          */
1243         int advance(difference_type n)
1244         {
1245                 int ret = 0;
1246                 index_type indx;
1247                 u_int32_t sz, flags = 0;
1248
1249                 indx = this->get_current_index();
1250                 if (n == 0)
1251                         return 0;
1252
1253                 index_type i = (index_type)n;
1254                 indx += i;
1255
1256                 if (n < 0 && indx < 1) { // Index in recno db starts from 1.
1257
1258                         ret = INVALID_ITERATOR_POSITION;
1259                         return ret;
1260                 }
1261                 this->inform_duppers();
1262
1263                 // Do a search to determine whether new position is valid.
1264                 Dbt k, &d = this->data_buf_;
1265
1266                 
1267                 k.set_data(&indx);
1268                 k.set_size(sizeof(indx));
1269                 if (this->rmw_get_)
1270                         flags |= DB_RMW;
1271
1272 retry:          if (this->csr_ && 
1273                     ((ret = this->csr_->get(&k, &d, DB_SET)) == DB_NOTFOUND)) {
1274                         this->csr_status_ = ret = INVALID_ITERATOR_POSITION;
1275                         this->curr_key_.reset();
1276                         this->curr_data_.reset();
1277                 } else if (ret == DB_BUFFER_SMALL) {
1278                         sz = d.get_size();
1279                         assert(sz > 0);
1280                         this->enlarge_dbt(d, sz);
1281                         goto retry;
1282                 } else if (ret == 0) {
1283                         this->curr_key_.set_dbt(k, false);
1284                         this->curr_data_.set_dbt(d, false);
1285                         this->limit_buf_size_after_use();
1286                 } else
1287                         throw_bdb_exception("RandDbCursor<>::advance", ret);
1288                 this->csr_status_ = ret;
1289                 return ret;
1290         }
1291
1292         // Return the last index of recno db (index starting from 1),
1293         // it will also move the underlying cursor to last key/data pair.
1294         //
1295         inline index_type last_index()
1296         {
1297                 int ret;
1298
1299                 ret = this->last();
1300                 if (ret)
1301                         return 0;// Invalid position.
1302                 else
1303                         return get_current_index();
1304         }
1305
1306         explicit RandDbCursor(u_int32_t b_bulk_retrieval = 0,
1307             bool b_rmw1 = false, bool directdbget = true)
1308             : base(b_bulk_retrieval, b_rmw1, directdbget)
1309         {
1310         }
1311
1312         RandDbCursor(const RandDbCursor<data_dt>& rdbc) : base(rdbc)
1313         {
1314         }
1315
1316         explicit RandDbCursor(Dbc* csr1, int posidx = 0) : base(csr1)
1317         {
1318         }
1319
1320         virtual ~RandDbCursor()
1321         {
1322         }
1323
1324 }; // RandDbCursor<>
1325
1326 END_NS //ns dbstl
1327
1328 #endif // !_DB_STL_DBC_H