2019-04-16 14:58:56 +02:00
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package mongo
import (
"context"
2019-11-27 15:29:34 +01:00
"errors"
"fmt"
2023-07-06 20:44:12 +02:00
"time"
2019-04-16 14:58:56 +02:00
2019-11-27 15:29:34 +01:00
"go.mongodb.org/mongo-driver/bson"
2019-04-16 14:58:56 +02:00
"go.mongodb.org/mongo-driver/bson/bsoncodec"
2022-10-26 15:56:35 +02:00
"go.mongodb.org/mongo-driver/internal"
2021-05-20 20:53:56 +02:00
"go.mongodb.org/mongo-driver/mongo/description"
2019-04-16 14:58:56 +02:00
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
2021-05-20 20:53:56 +02:00
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
2019-04-16 14:58:56 +02:00
"go.mongodb.org/mongo-driver/x/mongo/driver"
2019-11-27 15:29:34 +01:00
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
)
var (
defaultRunCmdOpts = [ ] * options . RunCmdOptions { options . RunCmd ( ) . SetReadPreference ( readpref . Primary ( ) ) }
2019-04-16 14:58:56 +02:00
)
2020-05-14 19:09:55 +02:00
// Database is a handle to a MongoDB database. It is safe for concurrent use by multiple goroutines.
2019-04-16 14:58:56 +02:00
type Database struct {
client * Client
name string
readConcern * readconcern . ReadConcern
writeConcern * writeconcern . WriteConcern
readPreference * readpref . ReadPref
readSelector description . ServerSelector
writeSelector description . ServerSelector
2023-07-06 20:44:12 +02:00
bsonOpts * options . BSONOptions
2019-04-16 14:58:56 +02:00
registry * bsoncodec . Registry
}
func newDatabase ( client * Client , name string , opts ... * options . DatabaseOptions ) * Database {
dbOpt := options . MergeDatabaseOptions ( opts ... )
rc := client . readConcern
if dbOpt . ReadConcern != nil {
rc = dbOpt . ReadConcern
}
rp := client . readPreference
if dbOpt . ReadPreference != nil {
rp = dbOpt . ReadPreference
}
wc := client . writeConcern
if dbOpt . WriteConcern != nil {
wc = dbOpt . WriteConcern
}
2023-07-06 20:44:12 +02:00
bsonOpts := client . bsonOpts
if dbOpt . BSONOptions != nil {
bsonOpts = dbOpt . BSONOptions
}
2019-11-27 15:29:34 +01:00
reg := client . registry
if dbOpt . Registry != nil {
reg = dbOpt . Registry
}
2019-04-16 14:58:56 +02:00
db := & Database {
client : client ,
name : name ,
readPreference : rp ,
readConcern : rc ,
writeConcern : wc ,
2023-07-06 20:44:12 +02:00
bsonOpts : bsonOpts ,
2019-11-27 15:29:34 +01:00
registry : reg ,
2019-04-16 14:58:56 +02:00
}
db . readSelector = description . CompositeSelector ( [ ] description . ServerSelector {
description . ReadPrefSelector ( db . readPreference ) ,
description . LatencySelector ( db . client . localThreshold ) ,
} )
db . writeSelector = description . CompositeSelector ( [ ] description . ServerSelector {
description . WriteSelector ( ) ,
description . LatencySelector ( db . client . localThreshold ) ,
} )
return db
}
2020-05-14 19:09:55 +02:00
// Client returns the Client the Database was created from.
2019-04-16 14:58:56 +02:00
func ( db * Database ) Client ( ) * Client {
return db . client
}
// Name returns the name of the database.
func ( db * Database ) Name ( ) string {
return db . name
}
2020-05-14 19:09:55 +02:00
// Collection gets a handle for a collection with the given name configured with the given CollectionOptions.
2019-04-16 14:58:56 +02:00
func ( db * Database ) Collection ( name string , opts ... * options . CollectionOptions ) * Collection {
return newCollection ( db , name , opts ... )
}
2020-05-14 19:09:55 +02:00
// Aggregate executes an aggregate command the database. This requires MongoDB version >= 3.6 and driver version >=
// 1.1.0.
2019-11-27 15:29:34 +01:00
//
2020-05-14 19:09:55 +02:00
// The pipeline parameter must be a slice of documents, each representing an aggregation stage. The pipeline
// cannot be nil but can be empty. The stage documents must all be non-nil. For a pipeline of bson.D documents, the
// mongo.Pipeline type can be used. See
2022-10-26 15:56:35 +02:00
// https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#db-aggregate-stages for a list of valid
2020-05-14 19:09:55 +02:00
// stages in database-level aggregations.
//
// The opts parameter can be used to specify options for this operation (see the options.AggregateOptions documentation).
//
2022-10-26 15:56:35 +02:00
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/aggregate/.
2019-11-27 15:29:34 +01:00
func ( db * Database ) Aggregate ( ctx context . Context , pipeline interface { } ,
opts ... * options . AggregateOptions ) ( * Cursor , error ) {
a := aggregateParams {
ctx : ctx ,
pipeline : pipeline ,
client : db . client ,
registry : db . registry ,
readConcern : db . readConcern ,
writeConcern : db . writeConcern ,
retryRead : db . client . retryReads ,
db : db . name ,
readSelector : db . readSelector ,
writeSelector : db . writeSelector ,
readPreference : db . readPreference ,
opts : opts ,
2019-04-16 14:58:56 +02:00
}
2019-11-27 15:29:34 +01:00
return aggregate ( a )
}
2019-04-16 14:58:56 +02:00
2019-11-27 15:29:34 +01:00
func ( db * Database ) processRunCommand ( ctx context . Context , cmd interface { } ,
2021-05-20 20:53:56 +02:00
cursorCommand bool , opts ... * options . RunCmdOptions ) ( * operation . Command , * session . Client , error ) {
2019-04-16 14:58:56 +02:00
sess := sessionFromContext ( ctx )
2020-05-14 19:09:55 +02:00
if sess == nil && db . client . sessionPool != nil {
2023-07-06 20:44:12 +02:00
sess = session . NewImplicitClientSession ( db . client . sessionPool , db . client . id )
2019-11-27 15:29:34 +01:00
}
2019-04-16 14:58:56 +02:00
2019-11-27 15:29:34 +01:00
err := db . client . validSession ( sess )
if err != nil {
return nil , sess , err
2019-04-16 14:58:56 +02:00
}
2019-11-27 15:29:34 +01:00
ro := options . MergeRunCmdOptions ( append ( defaultRunCmdOpts , opts ... ) ... )
if sess != nil && sess . TransactionRunning ( ) && ro . ReadPreference != nil && ro . ReadPreference . Mode ( ) != readpref . PrimaryMode {
return nil , sess , errors . New ( "read preference in a transaction must be primary" )
2019-04-16 14:58:56 +02:00
}
2023-07-06 20:44:12 +02:00
if isUnorderedMap ( cmd ) {
return nil , sess , ErrMapForOrderedArgument { "cmd" }
}
runCmdDoc , err := marshal ( cmd , db . bsonOpts , db . registry )
2019-04-16 14:58:56 +02:00
if err != nil {
2019-11-27 15:29:34 +01:00
return nil , sess , err
2019-04-16 14:58:56 +02:00
}
readSelect := description . CompositeSelector ( [ ] description . ServerSelector {
2019-11-27 15:29:34 +01:00
description . ReadPrefSelector ( ro . ReadPreference ) ,
2019-04-16 14:58:56 +02:00
description . LatencySelector ( db . client . localThreshold ) ,
} )
2019-11-27 15:29:34 +01:00
if sess != nil && sess . PinnedServer != nil {
2021-05-20 20:53:56 +02:00
readSelect = makePinnedSelector ( sess , readSelect )
2019-11-27 15:29:34 +01:00
}
2019-04-16 14:58:56 +02:00
2021-05-20 20:53:56 +02:00
var op * operation . Command
switch cursorCommand {
case true :
cursorOpts := db . client . createBaseCursorOptions ( )
op = operation . NewCursorCommand ( runCmdDoc , cursorOpts )
default :
op = operation . NewCommand ( runCmdDoc )
}
2023-07-06 20:44:12 +02:00
// TODO(GODRIVER-2649): ReadConcern(db.readConcern) will not actually pass the database's
// read concern. Remove this note once readConcern is correctly passed to the operation
// level.
2021-05-20 20:53:56 +02:00
return op . Session ( sess ) . CommandMonitor ( db . client . monitor ) .
2019-11-27 15:29:34 +01:00
ServerSelector ( readSelect ) . ClusterClock ( db . client . clock ) .
2021-05-20 20:53:56 +02:00
Database ( db . name ) . Deployment ( db . client . deployment ) . ReadConcern ( db . readConcern ) .
2022-10-26 15:56:35 +02:00
Crypt ( db . client . cryptFLE ) . ReadPreference ( ro . ReadPreference ) . ServerAPI ( db . client . serverAPI ) .
2023-07-06 20:44:12 +02:00
Timeout ( db . client . timeout ) . Logger ( db . client . logger ) , sess , nil
2019-04-16 14:58:56 +02:00
}
2021-05-20 20:53:56 +02:00
// RunCommand executes the given command against the database. This function does not obey the Database's read
// preference. To specify a read preference, the RunCmdOptions.ReadPreference option must be used.
2020-05-14 19:09:55 +02:00
//
// The runCommand parameter must be a document for the command to be executed. It cannot be nil.
// This must be an order-preserving type such as bson.D. Map types such as bson.M are not valid.
//
// The opts parameter can be used to specify options for this operation (see the options.RunCmdOptions documentation).
2022-10-26 15:56:35 +02:00
//
// The behavior of RunCommand is undefined if the command document contains any of the following:
// - A session ID or any transaction-specific fields
// - API versioning options when an API version is already declared on the Client
// - maxTimeMS when Timeout is set on the Client
2019-04-16 14:58:56 +02:00
func ( db * Database ) RunCommand ( ctx context . Context , runCommand interface { } , opts ... * options . RunCmdOptions ) * SingleResult {
if ctx == nil {
ctx = context . Background ( )
}
2021-05-20 20:53:56 +02:00
op , sess , err := db . processRunCommand ( ctx , runCommand , false , opts ... )
2019-11-27 15:29:34 +01:00
defer closeImplicitSession ( sess )
2019-04-16 14:58:56 +02:00
if err != nil {
return & SingleResult { err : err }
}
2019-11-27 15:29:34 +01:00
err = op . Execute ( ctx )
2021-05-20 20:53:56 +02:00
// RunCommand can be used to run a write, thus execute may return a write error
_ , convErr := processWriteError ( err )
2019-11-27 15:29:34 +01:00
return & SingleResult {
2023-07-06 20:44:12 +02:00
ctx : ctx ,
err : convErr ,
rdr : bson . Raw ( op . Result ( ) ) ,
bsonOpts : db . bsonOpts ,
reg : db . registry ,
2019-11-27 15:29:34 +01:00
}
2019-04-16 14:58:56 +02:00
}
2020-05-14 19:09:55 +02:00
// RunCommandCursor executes the given command against the database and parses the response as a cursor. If the command
2021-05-20 20:53:56 +02:00
// being executed does not return a cursor (e.g. insert), the command will be executed on the server and an error will
// be returned because the server response cannot be parsed as a cursor. This function does not obey the Database's read
// preference. To specify a read preference, the RunCmdOptions.ReadPreference option must be used.
2020-05-14 19:09:55 +02:00
//
// The runCommand parameter must be a document for the command to be executed. It cannot be nil.
// This must be an order-preserving type such as bson.D. Map types such as bson.M are not valid.
//
// The opts parameter can be used to specify options for this operation (see the options.RunCmdOptions documentation).
2022-10-26 15:56:35 +02:00
//
// The behavior of RunCommandCursor is undefined if the command document contains any of the following:
// - A session ID or any transaction-specific fields
// - API versioning options when an API version is already declared on the Client
// - maxTimeMS when Timeout is set on the Client
2019-04-16 14:58:56 +02:00
func ( db * Database ) RunCommandCursor ( ctx context . Context , runCommand interface { } , opts ... * options . RunCmdOptions ) ( * Cursor , error ) {
if ctx == nil {
ctx = context . Background ( )
}
2021-05-20 20:53:56 +02:00
op , sess , err := db . processRunCommand ( ctx , runCommand , true , opts ... )
2019-04-16 14:58:56 +02:00
if err != nil {
2019-11-27 15:29:34 +01:00
closeImplicitSession ( sess )
return nil , replaceErrors ( err )
2019-04-16 14:58:56 +02:00
}
2019-11-27 15:29:34 +01:00
if err = op . Execute ( ctx ) ; err != nil {
closeImplicitSession ( sess )
2019-04-16 14:58:56 +02:00
return nil , replaceErrors ( err )
}
2021-05-20 20:53:56 +02:00
bc , err := op . ResultCursor ( )
2019-11-27 15:29:34 +01:00
if err != nil {
closeImplicitSession ( sess )
return nil , replaceErrors ( err )
}
2023-07-06 20:44:12 +02:00
cursor , err := newCursorWithSession ( bc , db . bsonOpts , db . registry , sess )
2019-04-16 14:58:56 +02:00
return cursor , replaceErrors ( err )
}
2020-05-14 19:09:55 +02:00
// Drop drops the database on the server. This method ignores "namespace not found" errors so it is safe to drop
// a database that does not exist on the server.
2019-04-16 14:58:56 +02:00
func ( db * Database ) Drop ( ctx context . Context ) error {
if ctx == nil {
ctx = context . Background ( )
}
sess := sessionFromContext ( ctx )
2020-05-14 19:09:55 +02:00
if sess == nil && db . client . sessionPool != nil {
2023-07-06 20:44:12 +02:00
sess = session . NewImplicitClientSession ( db . client . sessionPool , db . client . id )
2019-11-27 15:29:34 +01:00
defer sess . EndSession ( )
}
2019-04-16 14:58:56 +02:00
err := db . client . validSession ( sess )
if err != nil {
return err
}
2019-11-27 15:29:34 +01:00
wc := db . writeConcern
if sess . TransactionRunning ( ) {
wc = nil
}
if ! writeconcern . AckWrite ( wc ) {
sess = nil
2019-04-16 14:58:56 +02:00
}
2019-11-27 15:29:34 +01:00
selector := makePinnedSelector ( sess , db . writeSelector )
op := operation . NewDropDatabase ( ) .
Session ( sess ) . WriteConcern ( wc ) . CommandMonitor ( db . client . monitor ) .
ServerSelector ( selector ) . ClusterClock ( db . client . clock ) .
2021-05-20 20:53:56 +02:00
Database ( db . name ) . Deployment ( db . client . deployment ) . Crypt ( db . client . cryptFLE ) .
ServerAPI ( db . client . serverAPI )
2019-11-27 15:29:34 +01:00
err = op . Execute ( ctx )
driverErr , ok := err . ( driver . Error )
if err != nil && ( ! ok || ! driverErr . NamespaceNotFound ( ) ) {
2019-04-16 14:58:56 +02:00
return replaceErrors ( err )
}
return nil
}
2021-05-20 20:53:56 +02:00
// ListCollectionSpecifications executes a listCollections command and returns a slice of CollectionSpecification
// instances representing the collections in the database.
//
// The filter parameter must be a document containing query operators and can be used to select which collections
// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
// collections.
//
// The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
// documentation).
//
2022-10-26 15:56:35 +02:00
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
2022-04-26 18:41:22 +02:00
//
// BUG(benjirewis): ListCollectionSpecifications prevents listing more than 100 collections per database when running
// against MongoDB version 2.6.
2021-05-20 20:53:56 +02:00
func ( db * Database ) ListCollectionSpecifications ( ctx context . Context , filter interface { } ,
opts ... * options . ListCollectionsOptions ) ( [ ] * CollectionSpecification , error ) {
cursor , err := db . ListCollections ( ctx , filter , opts ... )
if err != nil {
return nil , err
}
var specs [ ] * CollectionSpecification
err = cursor . All ( ctx , & specs )
if err != nil {
return nil , err
}
for _ , spec := range specs {
// Pre-4.4 servers report a namespace in their responses, so we only set Namespace manually if it was not in
// the response.
if spec . IDIndex != nil && spec . IDIndex . Namespace == "" {
spec . IDIndex . Namespace = db . name + "." + spec . Name
}
}
return specs , nil
}
2020-05-14 19:09:55 +02:00
// ListCollections executes a listCollections command and returns a cursor over the collections in the database.
//
// The filter parameter must be a document containing query operators and can be used to select which collections
// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
// collections.
//
// The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
// documentation).
//
2022-10-26 15:56:35 +02:00
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
2022-04-26 18:41:22 +02:00
//
// BUG(benjirewis): ListCollections prevents listing more than 100 collections per database when running against
// MongoDB version 2.6.
2019-04-16 14:58:56 +02:00
func ( db * Database ) ListCollections ( ctx context . Context , filter interface { } , opts ... * options . ListCollectionsOptions ) ( * Cursor , error ) {
if ctx == nil {
ctx = context . Background ( )
}
2023-07-06 20:44:12 +02:00
filterDoc , err := marshal ( filter , db . bsonOpts , db . registry )
2019-04-16 14:58:56 +02:00
if err != nil {
return nil , err
}
2019-11-27 15:29:34 +01:00
sess := sessionFromContext ( ctx )
2020-05-14 19:09:55 +02:00
if sess == nil && db . client . sessionPool != nil {
2023-07-06 20:44:12 +02:00
sess = session . NewImplicitClientSession ( db . client . sessionPool , db . client . id )
2019-04-16 14:58:56 +02:00
}
2019-11-27 15:29:34 +01:00
err = db . client . validSession ( sess )
if err != nil {
closeImplicitSession ( sess )
return nil , err
2019-04-16 14:58:56 +02:00
}
2019-11-27 15:29:34 +01:00
selector := description . CompositeSelector ( [ ] description . ServerSelector {
2019-04-16 14:58:56 +02:00
description . ReadPrefSelector ( readpref . Primary ( ) ) ,
description . LatencySelector ( db . client . localThreshold ) ,
} )
2019-11-27 15:29:34 +01:00
selector = makeReadPrefSelector ( sess , selector , db . client . localThreshold )
lco := options . MergeListCollectionsOptions ( opts ... )
op := operation . NewListCollections ( filterDoc ) .
Session ( sess ) . ReadPreference ( db . readPreference ) . CommandMonitor ( db . client . monitor ) .
ServerSelector ( selector ) . ClusterClock ( db . client . clock ) .
2021-05-20 20:53:56 +02:00
Database ( db . name ) . Deployment ( db . client . deployment ) . Crypt ( db . client . cryptFLE ) .
2022-10-26 15:56:35 +02:00
ServerAPI ( db . client . serverAPI ) . Timeout ( db . client . timeout )
2021-05-20 20:53:56 +02:00
cursorOpts := db . client . createBaseCursorOptions ( )
2019-11-27 15:29:34 +01:00
if lco . NameOnly != nil {
op = op . NameOnly ( * lco . NameOnly )
}
2021-05-20 20:53:56 +02:00
if lco . BatchSize != nil {
cursorOpts . BatchSize = * lco . BatchSize
op = op . BatchSize ( * lco . BatchSize )
}
2022-04-26 18:41:22 +02:00
if lco . AuthorizedCollections != nil {
op = op . AuthorizedCollections ( * lco . AuthorizedCollections )
}
2021-05-20 20:53:56 +02:00
2019-11-27 15:29:34 +01:00
retry := driver . RetryNone
if db . client . retryReads {
retry = driver . RetryOncePerCommand
}
op = op . Retry ( retry )
err = op . Execute ( ctx )
2019-04-16 14:58:56 +02:00
if err != nil {
2019-11-27 15:29:34 +01:00
closeImplicitSession ( sess )
2019-04-16 14:58:56 +02:00
return nil , replaceErrors ( err )
}
2021-05-20 20:53:56 +02:00
bc , err := op . Result ( cursorOpts )
2019-11-27 15:29:34 +01:00
if err != nil {
closeImplicitSession ( sess )
return nil , replaceErrors ( err )
}
2023-07-06 20:44:12 +02:00
cursor , err := newCursorWithSession ( bc , db . bsonOpts , db . registry , sess )
2019-04-16 14:58:56 +02:00
return cursor , replaceErrors ( err )
}
2020-05-14 19:09:55 +02:00
// ListCollectionNames executes a listCollections command and returns a slice containing the names of the collections
// in the database. This method requires driver version >= 1.1.0.
//
// The filter parameter must be a document containing query operators and can be used to select which collections
// are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
// collections.
//
// The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
// documentation).
//
2022-10-26 15:56:35 +02:00
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
2022-04-26 18:41:22 +02:00
//
// BUG(benjirewis): ListCollectionNames prevents listing more than 100 collections per database when running against
// MongoDB version 2.6.
2019-11-27 15:29:34 +01:00
func ( db * Database ) ListCollectionNames ( ctx context . Context , filter interface { } , opts ... * options . ListCollectionsOptions ) ( [ ] string , error ) {
opts = append ( opts , options . ListCollections ( ) . SetNameOnly ( true ) )
res , err := db . ListCollections ( ctx , filter , opts ... )
if err != nil {
return nil , err
}
defer res . Close ( ctx )
names := make ( [ ] string , 0 )
for res . Next ( ctx ) {
2022-10-26 15:56:35 +02:00
elem , err := res . Current . LookupErr ( "name" )
2019-11-27 15:29:34 +01:00
if err != nil {
return nil , err
}
2022-10-26 15:56:35 +02:00
if elem . Type != bson . TypeString {
return nil , fmt . Errorf ( "incorrect type for 'name'. got %v. want %v" , elem . Type , bson . TypeString )
2019-11-27 15:29:34 +01:00
}
elemName := elem . StringValue ( )
names = append ( names , elemName )
}
res . Close ( ctx )
return names , nil
}
2020-05-14 19:09:55 +02:00
// ReadConcern returns the read concern used to configure the Database object.
2019-04-16 14:58:56 +02:00
func ( db * Database ) ReadConcern ( ) * readconcern . ReadConcern {
return db . readConcern
}
2020-05-14 19:09:55 +02:00
// ReadPreference returns the read preference used to configure the Database object.
2019-04-16 14:58:56 +02:00
func ( db * Database ) ReadPreference ( ) * readpref . ReadPref {
return db . readPreference
}
2020-05-14 19:09:55 +02:00
// WriteConcern returns the write concern used to configure the Database object.
2019-04-16 14:58:56 +02:00
func ( db * Database ) WriteConcern ( ) * writeconcern . WriteConcern {
return db . writeConcern
}
2020-05-14 19:09:55 +02:00
// Watch returns a change stream for all changes to the corresponding database. See
2022-10-26 15:56:35 +02:00
// https://www.mongodb.com/docs/manual/changeStreams/ for more information about change streams.
2020-05-14 19:09:55 +02:00
//
// The Database must be configured with read concern majority or no read concern for a change stream to be created
// successfully.
//
// The pipeline parameter must be a slice of documents, each representing a pipeline stage. The pipeline cannot be
2022-10-26 15:56:35 +02:00
// nil but can be empty. The stage documents must all be non-nil. See https://www.mongodb.com/docs/manual/changeStreams/ for
2020-05-14 19:09:55 +02:00
// a list of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the
// mongo.Pipeline{} type can be used.
//
// The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions
// documentation).
2019-04-16 14:58:56 +02:00
func ( db * Database ) Watch ( ctx context . Context , pipeline interface { } ,
opts ... * options . ChangeStreamOptions ) ( * ChangeStream , error ) {
2019-11-27 15:29:34 +01:00
csConfig := changeStreamConfig {
readConcern : db . readConcern ,
readPreference : db . readPreference ,
client : db . client ,
registry : db . registry ,
streamType : DatabaseStream ,
databaseName : db . Name ( ) ,
2021-05-20 20:53:56 +02:00
crypt : db . client . cryptFLE ,
2019-11-27 15:29:34 +01:00
}
return newChangeStream ( ctx , csConfig , pipeline , opts ... )
2019-04-16 14:58:56 +02:00
}
2021-05-20 20:53:56 +02:00
// CreateCollection executes a create command to explicitly create a new collection with the specified name on the
// server. If the collection being created already exists, this method will return a mongo.CommandError. This method
// requires driver version 1.4.0 or higher.
//
// The opts parameter can be used to specify options for the operation (see the options.CreateCollectionOptions
// documentation).
2021-08-23 15:26:35 +02:00
//
2022-10-26 15:56:35 +02:00
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/create/.
2021-05-20 20:53:56 +02:00
func ( db * Database ) CreateCollection ( ctx context . Context , name string , opts ... * options . CreateCollectionOptions ) error {
2022-10-26 15:56:35 +02:00
cco := options . MergeCreateCollectionOptions ( opts ... )
// Follow Client-Side Encryption specification to check for encryptedFields.
// Check for encryptedFields from create options.
ef := cco . EncryptedFields
// Check for encryptedFields from the client EncryptedFieldsMap.
if ef == nil {
ef = db . getEncryptedFieldsFromMap ( name )
}
if ef != nil {
return db . createCollectionWithEncryptedFields ( ctx , name , ef , opts ... )
}
return db . createCollection ( ctx , name , opts ... )
}
// getEncryptedFieldsFromServer tries to get an "encryptedFields" document associated with collectionName by running the "listCollections" command.
// Returns nil and no error if the listCollections command succeeds, but "encryptedFields" is not present.
func ( db * Database ) getEncryptedFieldsFromServer ( ctx context . Context , collectionName string ) ( interface { } , error ) {
// Check if collection has an EncryptedFields configured server-side.
collSpecs , err := db . ListCollectionSpecifications ( ctx , bson . D { { "name" , collectionName } } )
if err != nil {
return nil , err
}
if len ( collSpecs ) == 0 {
return nil , nil
}
if len ( collSpecs ) > 1 {
return nil , fmt . Errorf ( "expected 1 or 0 results from listCollections, got %v" , len ( collSpecs ) )
}
collSpec := collSpecs [ 0 ]
rawValue , err := collSpec . Options . LookupErr ( "encryptedFields" )
if err == bsoncore . ErrElementNotFound {
return nil , nil
} else if err != nil {
return nil , err
}
encryptedFields , ok := rawValue . DocumentOK ( )
if ! ok {
return nil , fmt . Errorf ( "expected encryptedFields of %v to be document, got %v" , collectionName , rawValue . Type )
}
return encryptedFields , nil
}
// getEncryptedFieldsFromServer tries to get an "encryptedFields" document associated with collectionName by checking the client EncryptedFieldsMap.
// Returns nil and no error if an EncryptedFieldsMap is not configured, or does not contain an entry for collectionName.
func ( db * Database ) getEncryptedFieldsFromMap ( collectionName string ) interface { } {
// Check the EncryptedFieldsMap
efMap := db . client . encryptedFieldsMap
if efMap == nil {
return nil
}
namespace := db . name + "." + collectionName
ef , ok := efMap [ namespace ]
if ok {
return ef
}
return nil
}
// createCollectionWithEncryptedFields creates a collection with an EncryptedFields.
func ( db * Database ) createCollectionWithEncryptedFields ( ctx context . Context , name string , ef interface { } , opts ... * options . CreateCollectionOptions ) error {
2023-07-06 20:44:12 +02:00
efBSON , err := marshal ( ef , db . bsonOpts , db . registry )
2022-10-26 15:56:35 +02:00
if err != nil {
return fmt . Errorf ( "error transforming document: %v" , err )
}
2023-07-06 20:44:12 +02:00
// Check the wire version to ensure server is 7.0.0 or newer.
// After the wire version check, and before creating the collections, it is possible the server state changes.
// That is OK. This wire version check is a best effort to inform users earlier if using a QEv2 driver with a QEv1 server.
{
const QEv2WireVersion = 21
server , err := db . client . deployment . SelectServer ( ctx , description . WriteSelector ( ) )
if err != nil {
return fmt . Errorf ( "error selecting server to check maxWireVersion: %w" , err )
}
conn , err := server . Connection ( ctx )
if err != nil {
return fmt . Errorf ( "error getting connection to check maxWireVersion: %w" , err )
}
defer conn . Close ( )
wireVersionRange := conn . Description ( ) . WireVersion
if wireVersionRange . Max < QEv2WireVersion {
return fmt . Errorf ( "Driver support of Queryable Encryption is incompatible with server. Upgrade server to use Queryable Encryption. Got maxWireVersion %v but need maxWireVersion >= %v" , wireVersionRange . Max , QEv2WireVersion )
}
}
// Create the two encryption-related, associated collections: `escCollection` and `ecocCollection`.
2022-10-26 15:56:35 +02:00
stateCollectionOpts := options . CreateCollection ( ) .
SetClusteredIndex ( bson . D { { "key" , bson . D { { "_id" , 1 } } } , { "unique" , true } } )
// Create ESCCollection.
escCollection , err := internal . GetEncryptedStateCollectionName ( efBSON , name , internal . EncryptedStateCollection )
if err != nil {
return err
}
if err := db . createCollection ( ctx , escCollection , stateCollectionOpts ) ; err != nil {
return err
}
// Create ECOCCollection.
ecocCollection , err := internal . GetEncryptedStateCollectionName ( efBSON , name , internal . EncryptedCompactionCollection )
if err != nil {
return err
}
if err := db . createCollection ( ctx , ecocCollection , stateCollectionOpts ) ; err != nil {
return err
}
// Create a data collection with the 'encryptedFields' option.
op , err := db . createCollectionOperation ( name , opts ... )
if err != nil {
return err
}
op . EncryptedFields ( efBSON )
if err := db . executeCreateOperation ( ctx , op ) ; err != nil {
return err
}
// Create an index on the __safeContent__ field in the collection @collectionName.
if _ , err := db . Collection ( name ) . Indexes ( ) . CreateOne ( ctx , IndexModel { Keys : bson . D { { "__safeContent__" , 1 } } } ) ; err != nil {
return fmt . Errorf ( "error creating safeContent index: %v" , err )
}
return nil
}
// createCollection creates a collection without EncryptedFields.
func ( db * Database ) createCollection ( ctx context . Context , name string , opts ... * options . CreateCollectionOptions ) error {
op , err := db . createCollectionOperation ( name , opts ... )
if err != nil {
return err
}
return db . executeCreateOperation ( ctx , op )
}
func ( db * Database ) createCollectionOperation ( name string , opts ... * options . CreateCollectionOptions ) ( * operation . Create , error ) {
2021-05-20 20:53:56 +02:00
cco := options . MergeCreateCollectionOptions ( opts ... )
op := operation . NewCreate ( name ) . ServerAPI ( db . client . serverAPI )
if cco . Capped != nil {
op . Capped ( * cco . Capped )
}
if cco . Collation != nil {
op . Collation ( bsoncore . Document ( cco . Collation . ToDocument ( ) ) )
}
2022-10-26 15:56:35 +02:00
if cco . ChangeStreamPreAndPostImages != nil {
2023-07-06 20:44:12 +02:00
csppi , err := marshal ( cco . ChangeStreamPreAndPostImages , db . bsonOpts , db . registry )
2022-10-26 15:56:35 +02:00
if err != nil {
return nil , err
}
op . ChangeStreamPreAndPostImages ( csppi )
}
2021-05-20 20:53:56 +02:00
if cco . DefaultIndexOptions != nil {
idx , doc := bsoncore . AppendDocumentStart ( nil )
if cco . DefaultIndexOptions . StorageEngine != nil {
2023-07-06 20:44:12 +02:00
storageEngine , err := marshal ( cco . DefaultIndexOptions . StorageEngine , db . bsonOpts , db . registry )
2021-05-20 20:53:56 +02:00
if err != nil {
2022-10-26 15:56:35 +02:00
return nil , err
2021-05-20 20:53:56 +02:00
}
doc = bsoncore . AppendDocumentElement ( doc , "storageEngine" , storageEngine )
}
doc , err := bsoncore . AppendDocumentEnd ( doc , idx )
if err != nil {
2022-10-26 15:56:35 +02:00
return nil , err
2021-05-20 20:53:56 +02:00
}
op . IndexOptionDefaults ( doc )
}
if cco . MaxDocuments != nil {
op . Max ( * cco . MaxDocuments )
}
if cco . SizeInBytes != nil {
op . Size ( * cco . SizeInBytes )
}
if cco . StorageEngine != nil {
2023-07-06 20:44:12 +02:00
storageEngine , err := marshal ( cco . StorageEngine , db . bsonOpts , db . registry )
2021-05-20 20:53:56 +02:00
if err != nil {
2022-10-26 15:56:35 +02:00
return nil , err
2021-05-20 20:53:56 +02:00
}
op . StorageEngine ( storageEngine )
}
if cco . ValidationAction != nil {
op . ValidationAction ( * cco . ValidationAction )
}
if cco . ValidationLevel != nil {
op . ValidationLevel ( * cco . ValidationLevel )
}
if cco . Validator != nil {
2023-07-06 20:44:12 +02:00
validator , err := marshal ( cco . Validator , db . bsonOpts , db . registry )
2021-05-20 20:53:56 +02:00
if err != nil {
2022-10-26 15:56:35 +02:00
return nil , err
2021-05-20 20:53:56 +02:00
}
op . Validator ( validator )
}
2021-08-23 15:26:35 +02:00
if cco . ExpireAfterSeconds != nil {
op . ExpireAfterSeconds ( * cco . ExpireAfterSeconds )
}
if cco . TimeSeriesOptions != nil {
idx , doc := bsoncore . AppendDocumentStart ( nil )
doc = bsoncore . AppendStringElement ( doc , "timeField" , cco . TimeSeriesOptions . TimeField )
if cco . TimeSeriesOptions . MetaField != nil {
doc = bsoncore . AppendStringElement ( doc , "metaField" , * cco . TimeSeriesOptions . MetaField )
}
if cco . TimeSeriesOptions . Granularity != nil {
doc = bsoncore . AppendStringElement ( doc , "granularity" , * cco . TimeSeriesOptions . Granularity )
}
2023-07-06 20:44:12 +02:00
if cco . TimeSeriesOptions . BucketMaxSpan != nil {
bmss := int64 ( * cco . TimeSeriesOptions . BucketMaxSpan / time . Second )
doc = bsoncore . AppendInt64Element ( doc , "bucketMaxSpanSeconds" , bmss )
}
if cco . TimeSeriesOptions . BucketRounding != nil {
brs := int64 ( * cco . TimeSeriesOptions . BucketRounding / time . Second )
doc = bsoncore . AppendInt64Element ( doc , "bucketRoundingSeconds" , brs )
}
2021-08-23 15:26:35 +02:00
doc , err := bsoncore . AppendDocumentEnd ( doc , idx )
if err != nil {
2022-10-26 15:56:35 +02:00
return nil , err
2021-08-23 15:26:35 +02:00
}
op . TimeSeries ( doc )
}
2022-10-26 15:56:35 +02:00
if cco . ClusteredIndex != nil {
2023-07-06 20:44:12 +02:00
clusteredIndex , err := marshal ( cco . ClusteredIndex , db . bsonOpts , db . registry )
2022-10-26 15:56:35 +02:00
if err != nil {
return nil , err
}
op . ClusteredIndex ( clusteredIndex )
}
2021-05-20 20:53:56 +02:00
2022-10-26 15:56:35 +02:00
return op , nil
2021-05-20 20:53:56 +02:00
}
// CreateView executes a create command to explicitly create a view on the server. See
2022-10-26 15:56:35 +02:00
// https://www.mongodb.com/docs/manual/core/views/ for more information about views. This method requires driver version >=
2021-05-20 20:53:56 +02:00
// 1.4.0 and MongoDB version >= 3.4.
//
// The viewName parameter specifies the name of the view to create.
//
2023-07-06 20:44:12 +02:00
// # The viewOn parameter specifies the name of the collection or view on which this view will be created
2021-05-20 20:53:56 +02:00
//
// The pipeline parameter specifies an aggregation pipeline that will be exececuted against the source collection or
// view to create this view.
//
// The opts parameter can be used to specify options for the operation (see the options.CreateViewOptions
// documentation).
func ( db * Database ) CreateView ( ctx context . Context , viewName , viewOn string , pipeline interface { } ,
opts ... * options . CreateViewOptions ) error {
2023-07-06 20:44:12 +02:00
pipelineArray , _ , err := marshalAggregatePipeline ( pipeline , db . bsonOpts , db . registry )
2021-05-20 20:53:56 +02:00
if err != nil {
return err
}
op := operation . NewCreate ( viewName ) .
ViewOn ( viewOn ) .
Pipeline ( pipelineArray ) .
ServerAPI ( db . client . serverAPI )
cvo := options . MergeCreateViewOptions ( opts ... )
if cvo . Collation != nil {
op . Collation ( bsoncore . Document ( cvo . Collation . ToDocument ( ) ) )
}
return db . executeCreateOperation ( ctx , op )
}
func ( db * Database ) executeCreateOperation ( ctx context . Context , op * operation . Create ) error {
sess := sessionFromContext ( ctx )
if sess == nil && db . client . sessionPool != nil {
2023-07-06 20:44:12 +02:00
sess = session . NewImplicitClientSession ( db . client . sessionPool , db . client . id )
2021-05-20 20:53:56 +02:00
defer sess . EndSession ( )
}
err := db . client . validSession ( sess )
if err != nil {
return err
}
wc := db . writeConcern
if sess . TransactionRunning ( ) {
wc = nil
}
if ! writeconcern . AckWrite ( wc ) {
sess = nil
}
selector := makePinnedSelector ( sess , db . writeSelector )
op = op . Session ( sess ) .
WriteConcern ( wc ) .
CommandMonitor ( db . client . monitor ) .
ServerSelector ( selector ) .
ClusterClock ( db . client . clock ) .
Database ( db . name ) .
Deployment ( db . client . deployment ) .
Crypt ( db . client . cryptFLE )
return replaceErrors ( op . Execute ( ctx ) )
}