mirror of https://github.com/go-gitea/gitea.git
1747 lines
45 KiB
Go
1747 lines
45 KiB
Go
package couchbase
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/url"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/couchbase/goutils/logging"
|
|
|
|
"github.com/couchbase/gomemcached" // package name is 'gomemcached'
|
|
"github.com/couchbase/gomemcached/client" // package name is 'memcached'
|
|
)
|
|
|
|
// HTTPClient to use for REST and view operations.
|
|
var MaxIdleConnsPerHost = 256
|
|
var ClientTimeOut = 10 * time.Second
|
|
var HTTPTransport = &http.Transport{MaxIdleConnsPerHost: MaxIdleConnsPerHost}
|
|
var HTTPClient = &http.Client{Transport: HTTPTransport, Timeout: ClientTimeOut}
|
|
|
|
// Use this client for reading from streams that should be open for an extended duration.
|
|
var HTTPClientForStreaming = &http.Client{Transport: HTTPTransport, Timeout: 0}
|
|
|
|
// PoolSize is the size of each connection pool (per host).
|
|
var PoolSize = 64
|
|
|
|
// PoolOverflow is the number of overflow connections allowed in a
|
|
// pool.
|
|
var PoolOverflow = 16
|
|
|
|
// AsynchronousCloser turns on asynchronous closing for overflow connections
|
|
var AsynchronousCloser = false
|
|
|
|
// TCP KeepAlive enabled/disabled
|
|
var TCPKeepalive = false
|
|
|
|
// Enable MutationToken
|
|
var EnableMutationToken = false
|
|
|
|
// Enable Data Type response
|
|
var EnableDataType = false
|
|
|
|
// Enable Xattr
|
|
var EnableXattr = false
|
|
|
|
// Enable Collections
|
|
var EnableCollections = false
|
|
|
|
// TCP keepalive interval in seconds. Default 30 minutes
|
|
var TCPKeepaliveInterval = 30 * 60
|
|
|
|
// Used to decide whether to skip verification of certificates when
|
|
// connecting to an ssl port.
|
|
var skipVerify = true
|
|
var certFile = ""
|
|
var keyFile = ""
|
|
var rootFile = ""
|
|
|
|
func SetSkipVerify(skip bool) {
|
|
skipVerify = skip
|
|
}
|
|
|
|
func SetCertFile(cert string) {
|
|
certFile = cert
|
|
}
|
|
|
|
func SetKeyFile(cert string) {
|
|
keyFile = cert
|
|
}
|
|
|
|
func SetRootFile(cert string) {
|
|
rootFile = cert
|
|
}
|
|
|
|
// Allow applications to speciify the Poolsize and Overflow
|
|
func SetConnectionPoolParams(size, overflow int) {
|
|
|
|
if size > 0 {
|
|
PoolSize = size
|
|
}
|
|
|
|
if overflow > 0 {
|
|
PoolOverflow = overflow
|
|
}
|
|
}
|
|
|
|
// Turn off overflow connections
|
|
func DisableOverflowConnections() {
|
|
PoolOverflow = 0
|
|
}
|
|
|
|
// Toggle asynchronous overflow closer
|
|
func EnableAsynchronousCloser(closer bool) {
|
|
AsynchronousCloser = closer
|
|
}
|
|
|
|
// Allow TCP keepalive parameters to be set by the application
|
|
func SetTcpKeepalive(enabled bool, interval int) {
|
|
|
|
TCPKeepalive = enabled
|
|
|
|
if interval > 0 {
|
|
TCPKeepaliveInterval = interval
|
|
}
|
|
}
|
|
|
|
// AuthHandler is a callback that gets the auth username and password
|
|
// for the given bucket.
|
|
type AuthHandler interface {
|
|
GetCredentials() (string, string, string)
|
|
}
|
|
|
|
// AuthHandler is a callback that gets the auth username and password
|
|
// for the given bucket and sasl for memcached.
|
|
type AuthWithSaslHandler interface {
|
|
AuthHandler
|
|
GetSaslCredentials() (string, string)
|
|
}
|
|
|
|
// MultiBucketAuthHandler is kind of AuthHandler that may perform
|
|
// different auth for different buckets.
|
|
type MultiBucketAuthHandler interface {
|
|
AuthHandler
|
|
ForBucket(bucket string) AuthHandler
|
|
}
|
|
|
|
// HTTPAuthHandler is kind of AuthHandler that performs more general
|
|
// for outgoing http requests than is possible via simple
|
|
// GetCredentials() call (i.e. digest auth or different auth per
|
|
// different destinations).
|
|
type HTTPAuthHandler interface {
|
|
AuthHandler
|
|
SetCredsForRequest(req *http.Request) error
|
|
}
|
|
|
|
// RestPool represents a single pool returned from the pools REST API.
|
|
type RestPool struct {
|
|
Name string `json:"name"`
|
|
StreamingURI string `json:"streamingUri"`
|
|
URI string `json:"uri"`
|
|
}
|
|
|
|
// Pools represents the collection of pools as returned from the REST API.
|
|
type Pools struct {
|
|
ComponentsVersion map[string]string `json:"componentsVersion,omitempty"`
|
|
ImplementationVersion string `json:"implementationVersion"`
|
|
IsAdmin bool `json:"isAdminCreds"`
|
|
UUID string `json:"uuid"`
|
|
Pools []RestPool `json:"pools"`
|
|
}
|
|
|
|
// A Node is a computer in a cluster running the couchbase software.
|
|
type Node struct {
|
|
ClusterCompatibility int `json:"clusterCompatibility"`
|
|
ClusterMembership string `json:"clusterMembership"`
|
|
CouchAPIBase string `json:"couchApiBase"`
|
|
Hostname string `json:"hostname"`
|
|
AlternateNames map[string]NodeAlternateNames `json:"alternateAddresses"`
|
|
InterestingStats map[string]float64 `json:"interestingStats,omitempty"`
|
|
MCDMemoryAllocated float64 `json:"mcdMemoryAllocated"`
|
|
MCDMemoryReserved float64 `json:"mcdMemoryReserved"`
|
|
MemoryFree float64 `json:"memoryFree"`
|
|
MemoryTotal float64 `json:"memoryTotal"`
|
|
OS string `json:"os"`
|
|
Ports map[string]int `json:"ports"`
|
|
Services []string `json:"services"`
|
|
Status string `json:"status"`
|
|
Uptime int `json:"uptime,string"`
|
|
Version string `json:"version"`
|
|
ThisNode bool `json:"thisNode,omitempty"`
|
|
}
|
|
|
|
// A Pool of nodes and buckets.
|
|
type Pool struct {
|
|
BucketMap map[string]*Bucket
|
|
Nodes []Node
|
|
|
|
BucketURL map[string]string `json:"buckets"`
|
|
|
|
MemoryQuota float64 `json:"memoryQuota"`
|
|
CbasMemoryQuota float64 `json:"cbasMemoryQuota"`
|
|
EventingMemoryQuota float64 `json:"eventingMemoryQuota"`
|
|
FtsMemoryQuota float64 `json:"ftsMemoryQuota"`
|
|
IndexMemoryQuota float64 `json:"indexMemoryQuota"`
|
|
|
|
client *Client
|
|
}
|
|
|
|
// VBucketServerMap is the a mapping of vbuckets to nodes.
|
|
type VBucketServerMap struct {
|
|
HashAlgorithm string `json:"hashAlgorithm"`
|
|
NumReplicas int `json:"numReplicas"`
|
|
ServerList []string `json:"serverList"`
|
|
VBucketMap [][]int `json:"vBucketMap"`
|
|
}
|
|
|
|
type DurablitySettings struct {
|
|
Persist PersistTo
|
|
Observe ObserveTo
|
|
}
|
|
|
|
// Bucket is the primary entry point for most data operations.
|
|
// Bucket is a locked data structure. All access to its fields should be done using read or write locking,
|
|
// as appropriate.
|
|
//
|
|
// Some access methods require locking, but rely on the caller to do so. These are appropriate
|
|
// for calls from methods that have already locked the structure. Methods like this
|
|
// take a boolean parameter "bucketLocked".
|
|
type Bucket struct {
|
|
sync.RWMutex
|
|
AuthType string `json:"authType"`
|
|
Capabilities []string `json:"bucketCapabilities"`
|
|
CapabilitiesVersion string `json:"bucketCapabilitiesVer"`
|
|
CollectionsManifestUid string `json:"collectionsManifestUid"`
|
|
Type string `json:"bucketType"`
|
|
Name string `json:"name"`
|
|
NodeLocator string `json:"nodeLocator"`
|
|
Quota map[string]float64 `json:"quota,omitempty"`
|
|
Replicas int `json:"replicaNumber"`
|
|
Password string `json:"saslPassword"`
|
|
URI string `json:"uri"`
|
|
StreamingURI string `json:"streamingUri"`
|
|
LocalRandomKeyURI string `json:"localRandomKeyUri,omitempty"`
|
|
UUID string `json:"uuid"`
|
|
ConflictResolutionType string `json:"conflictResolutionType,omitempty"`
|
|
DDocs struct {
|
|
URI string `json:"uri"`
|
|
} `json:"ddocs,omitempty"`
|
|
BasicStats map[string]interface{} `json:"basicStats,omitempty"`
|
|
Controllers map[string]interface{} `json:"controllers,omitempty"`
|
|
|
|
// These are used for JSON IO, but isn't used for processing
|
|
// since it needs to be swapped out safely.
|
|
VBSMJson VBucketServerMap `json:"vBucketServerMap"`
|
|
NodesJSON []Node `json:"nodes"`
|
|
|
|
pool *Pool
|
|
connPools unsafe.Pointer // *[]*connectionPool
|
|
vBucketServerMap unsafe.Pointer // *VBucketServerMap
|
|
nodeList unsafe.Pointer // *[]Node
|
|
commonSufix string
|
|
ah AuthHandler // auth handler
|
|
ds *DurablitySettings // Durablity Settings for this bucket
|
|
closed bool
|
|
}
|
|
|
|
// PoolServices is all the bucket-independent services in a pool
|
|
type PoolServices struct {
|
|
Rev int `json:"rev"`
|
|
NodesExt []NodeServices `json:"nodesExt"`
|
|
Capabilities json.RawMessage `json:"clusterCapabilities"`
|
|
}
|
|
|
|
// NodeServices is all the bucket-independent services running on
|
|
// a node (given by Hostname)
|
|
type NodeServices struct {
|
|
Services map[string]int `json:"services,omitempty"`
|
|
Hostname string `json:"hostname"`
|
|
ThisNode bool `json:"thisNode"`
|
|
AlternateNames map[string]NodeAlternateNames `json:"alternateAddresses"`
|
|
}
|
|
|
|
type NodeAlternateNames struct {
|
|
Hostname string `json:"hostname"`
|
|
Ports map[string]int `json:"ports"`
|
|
}
|
|
|
|
type BucketNotFoundError struct {
|
|
bucket string
|
|
}
|
|
|
|
func (e *BucketNotFoundError) Error() string {
|
|
return fmt.Sprint("No bucket named " + e.bucket)
|
|
}
|
|
|
|
type BucketAuth struct {
|
|
name string
|
|
saslPwd string
|
|
bucket string
|
|
}
|
|
|
|
func newBucketAuth(name string, pass string, bucket string) *BucketAuth {
|
|
return &BucketAuth{name: name, saslPwd: pass, bucket: bucket}
|
|
}
|
|
|
|
func (ba *BucketAuth) GetCredentials() (string, string, string) {
|
|
return ba.name, ba.saslPwd, ba.bucket
|
|
}
|
|
|
|
// VBServerMap returns the current VBucketServerMap.
|
|
func (b *Bucket) VBServerMap() *VBucketServerMap {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
ret := (*VBucketServerMap)(b.vBucketServerMap)
|
|
return ret
|
|
}
|
|
|
|
func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) {
|
|
vbmap := b.VBServerMap()
|
|
servers := vbmap.ServerList
|
|
if addrs == nil {
|
|
addrs = vbmap.ServerList
|
|
}
|
|
|
|
m := make(map[string][]uint16)
|
|
for _, addr := range addrs {
|
|
m[addr] = make([]uint16, 0)
|
|
}
|
|
for vbno, idxs := range vbmap.VBucketMap {
|
|
if len(idxs) == 0 {
|
|
return nil, fmt.Errorf("vbmap: No KV node no for vb %d", vbno)
|
|
} else if idxs[0] < 0 || idxs[0] >= len(servers) {
|
|
return nil, fmt.Errorf("vbmap: Invalid KV node no %d for vb %d", idxs[0], vbno)
|
|
}
|
|
addr := servers[idxs[0]]
|
|
if _, ok := m[addr]; ok {
|
|
m[addr] = append(m[addr], uint16(vbno))
|
|
}
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
// true if node is not on the bucket VBmap
|
|
func (b *Bucket) checkVBmap(node string) bool {
|
|
vbmap := b.VBServerMap()
|
|
servers := vbmap.ServerList
|
|
|
|
for _, idxs := range vbmap.VBucketMap {
|
|
if len(idxs) == 0 {
|
|
return true
|
|
} else if idxs[0] < 0 || idxs[0] >= len(servers) {
|
|
return true
|
|
}
|
|
if servers[idxs[0]] == node {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (b *Bucket) GetName() string {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
ret := b.Name
|
|
return ret
|
|
}
|
|
|
|
func (b *Bucket) GetUUID() string {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
ret := b.UUID
|
|
return ret
|
|
}
|
|
|
|
// Nodes returns the current list of nodes servicing this bucket.
|
|
func (b *Bucket) Nodes() []Node {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
ret := *(*[]Node)(b.nodeList)
|
|
return ret
|
|
}
|
|
|
|
// return the list of healthy nodes
|
|
func (b *Bucket) HealthyNodes() []Node {
|
|
nodes := []Node{}
|
|
|
|
for _, n := range b.Nodes() {
|
|
if n.Status == "healthy" && n.CouchAPIBase != "" {
|
|
nodes = append(nodes, n)
|
|
}
|
|
if n.Status != "healthy" { // log non-healthy node
|
|
logging.Infof("Non-healthy node; node details:")
|
|
logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode)
|
|
}
|
|
}
|
|
|
|
return nodes
|
|
}
|
|
|
|
func (b *Bucket) getConnPools(bucketLocked bool) []*connectionPool {
|
|
if !bucketLocked {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
}
|
|
if b.connPools != nil {
|
|
return *(*[]*connectionPool)(b.connPools)
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (b *Bucket) replaceConnPools(with []*connectionPool) {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
|
|
old := b.connPools
|
|
b.connPools = unsafe.Pointer(&with)
|
|
if old != nil {
|
|
for _, pool := range *(*[]*connectionPool)(old) {
|
|
if pool != nil {
|
|
pool.Close()
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (b *Bucket) getConnPool(i int) *connectionPool {
|
|
|
|
if i < 0 {
|
|
return nil
|
|
}
|
|
|
|
p := b.getConnPools(false /* not already locked */)
|
|
if len(p) > i {
|
|
return p[i]
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Bucket) getConnPoolByHost(host string, bucketLocked bool) *connectionPool {
|
|
pools := b.getConnPools(bucketLocked)
|
|
for _, p := range pools {
|
|
if p != nil && p.host == host {
|
|
return p
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Given a vbucket number, returns a memcached connection to it.
|
|
// The connection must be returned to its pool after use.
|
|
func (b *Bucket) getConnectionToVBucket(vb uint32) (*memcached.Client, *connectionPool, error) {
|
|
for {
|
|
vbm := b.VBServerMap()
|
|
if len(vbm.VBucketMap) < int(vb) {
|
|
return nil, nil, fmt.Errorf("go-couchbase: vbmap smaller than vbucket list: %v vs. %v",
|
|
vb, vbm.VBucketMap)
|
|
}
|
|
masterId := vbm.VBucketMap[vb][0]
|
|
if masterId < 0 {
|
|
return nil, nil, fmt.Errorf("go-couchbase: No master for vbucket %d", vb)
|
|
}
|
|
pool := b.getConnPool(masterId)
|
|
conn, err := pool.Get()
|
|
if err != errClosedPool {
|
|
return conn, pool, err
|
|
}
|
|
// If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
|
|
}
|
|
}
|
|
|
|
// To get random documents, we need to cover all the nodes, so select
|
|
// a connection at random.
|
|
|
|
func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, error) {
|
|
for {
|
|
var currentPool = 0
|
|
pools := b.getConnPools(false /* not already locked */)
|
|
if len(pools) == 0 {
|
|
return nil, nil, fmt.Errorf("No connection pool found")
|
|
} else if len(pools) > 1 { // choose a random connection
|
|
currentPool = rand.Intn(len(pools))
|
|
} // if only one pool, currentPool defaults to 0, i.e., the only pool
|
|
|
|
// get the pool
|
|
pool := pools[currentPool]
|
|
conn, err := pool.Get()
|
|
if err != errClosedPool {
|
|
return conn, pool, err
|
|
}
|
|
|
|
// If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
|
|
}
|
|
}
|
|
|
|
//
|
|
// Get a random document from a bucket. Since the bucket may be distributed
|
|
// across nodes, we must first select a random connection, and then use the
|
|
// Client.GetRandomDoc() call to get a random document from that node.
|
|
//
|
|
|
|
func (b *Bucket) GetRandomDoc(context ...*memcached.ClientContext) (*gomemcached.MCResponse, error) {
|
|
// get a connection from the pool
|
|
conn, pool, err := b.getRandomConnection()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn.SetDeadline(getDeadline(time.Time{}, DefaultTimeout))
|
|
|
|
// We may need to select the bucket before GetRandomDoc()
|
|
// will work. This is sometimes done at startup (see defaultMkConn())
|
|
// but not always, depending on the auth type.
|
|
_, err = conn.SelectBucket(b.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// get a randomm document from the connection
|
|
doc, err := conn.GetRandomDoc(context...)
|
|
// need to return the connection to the pool
|
|
pool.Return(conn)
|
|
return doc, err
|
|
}
|
|
|
|
// Bucket DDL
|
|
func uriAdj(s string) string {
|
|
return strings.Replace(s, "%", "%25", -1)
|
|
}
|
|
|
|
func (b *Bucket) CreateScope(scope string) error {
|
|
b.RLock()
|
|
pool := b.pool
|
|
client := pool.client
|
|
b.RUnlock()
|
|
args := map[string]interface{}{"name": scope}
|
|
return client.parsePostURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections", args, nil)
|
|
}
|
|
|
|
func (b *Bucket) DropScope(scope string) error {
|
|
b.RLock()
|
|
pool := b.pool
|
|
client := pool.client
|
|
b.RUnlock()
|
|
return client.parseDeleteURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections/"+uriAdj(scope), nil, nil)
|
|
}
|
|
|
|
func (b *Bucket) CreateCollection(scope string, collection string) error {
|
|
b.RLock()
|
|
pool := b.pool
|
|
client := pool.client
|
|
b.RUnlock()
|
|
args := map[string]interface{}{"name": collection}
|
|
return client.parsePostURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections/"+uriAdj(scope), args, nil)
|
|
}
|
|
|
|
func (b *Bucket) DropCollection(scope string, collection string) error {
|
|
b.RLock()
|
|
pool := b.pool
|
|
client := pool.client
|
|
b.RUnlock()
|
|
return client.parseDeleteURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections/"+uriAdj(scope)+"/"+uriAdj(collection), nil, nil)
|
|
}
|
|
|
|
func (b *Bucket) FlushCollection(scope string, collection string) error {
|
|
b.RLock()
|
|
pool := b.pool
|
|
client := pool.client
|
|
b.RUnlock()
|
|
args := map[string]interface{}{"name": collection, "scope": scope}
|
|
return client.parsePostURLResponseTerse("/pools/default/buckets/"+uriAdj(b.Name)+"/collections-flush", args, nil)
|
|
}
|
|
|
|
func (b *Bucket) getMasterNode(i int) string {
|
|
p := b.getConnPools(false /* not already locked */)
|
|
if len(p) > i {
|
|
return p[i].host
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (b *Bucket) authHandler(bucketLocked bool) (ah AuthHandler) {
|
|
if !bucketLocked {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
}
|
|
pool := b.pool
|
|
name := b.Name
|
|
|
|
if pool != nil {
|
|
ah = pool.client.ah
|
|
}
|
|
if mbah, ok := ah.(MultiBucketAuthHandler); ok {
|
|
return mbah.ForBucket(name)
|
|
}
|
|
if ah == nil {
|
|
ah = &basicAuth{name, ""}
|
|
}
|
|
return
|
|
}
|
|
|
|
// NodeAddresses gets the (sorted) list of memcached node addresses
|
|
// (hostname:port).
|
|
func (b *Bucket) NodeAddresses() []string {
|
|
vsm := b.VBServerMap()
|
|
rv := make([]string, len(vsm.ServerList))
|
|
copy(rv, vsm.ServerList)
|
|
sort.Strings(rv)
|
|
return rv
|
|
}
|
|
|
|
// CommonAddressSuffix finds the longest common suffix of all
|
|
// host:port strings in the node list.
|
|
func (b *Bucket) CommonAddressSuffix() string {
|
|
input := []string{}
|
|
for _, n := range b.Nodes() {
|
|
input = append(input, n.Hostname)
|
|
}
|
|
return FindCommonSuffix(input)
|
|
}
|
|
|
|
// A Client is the starting point for all services across all buckets
|
|
// in a Couchbase cluster.
|
|
type Client struct {
|
|
BaseURL *url.URL
|
|
ah AuthHandler
|
|
Info Pools
|
|
tlsConfig *tls.Config
|
|
}
|
|
|
|
func maybeAddAuth(req *http.Request, ah AuthHandler) error {
|
|
if hah, ok := ah.(HTTPAuthHandler); ok {
|
|
return hah.SetCredsForRequest(req)
|
|
}
|
|
if ah != nil {
|
|
user, pass, _ := ah.GetCredentials()
|
|
req.Header.Set("Authorization", "Basic "+
|
|
base64.StdEncoding.EncodeToString([]byte(user+":"+pass)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// arbitary number, may need to be tuned #FIXME
|
|
const HTTP_MAX_RETRY = 5
|
|
|
|
// Someday golang network packages will implement standard
|
|
// error codes. Until then #sigh
|
|
func isHttpConnError(err error) bool {
|
|
|
|
estr := err.Error()
|
|
return strings.Contains(estr, "broken pipe") ||
|
|
strings.Contains(estr, "broken connection") ||
|
|
strings.Contains(estr, "connection reset")
|
|
}
|
|
|
|
var client *http.Client
|
|
var clientForStreaming *http.Client
|
|
|
|
func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error) {
|
|
cfg := &tls.Config{}
|
|
|
|
if certFile != "" && keyFile != "" {
|
|
tlsCert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cfg.Certificates = []tls.Certificate{tlsCert}
|
|
} else {
|
|
//error need to pass both certfile and keyfile
|
|
return nil, fmt.Errorf("N1QL: Need to pass both certfile and keyfile")
|
|
}
|
|
|
|
var caCert []byte
|
|
var err1 error
|
|
|
|
caCertPool := x509.NewCertPool()
|
|
if rootFile != "" {
|
|
// Read that value in
|
|
caCert, err1 = ioutil.ReadFile(rootFile)
|
|
if err1 != nil {
|
|
return nil, fmt.Errorf(" Error in reading cacert file, err: %v", err1)
|
|
}
|
|
caCertPool.AppendCertsFromPEM(caCert)
|
|
}
|
|
|
|
cfg.RootCAs = caCertPool
|
|
return cfg, nil
|
|
}
|
|
|
|
// This version of doHTTPRequest is for requests where the response connection is held open
|
|
// for an extended duration since line is a new and significant output.
|
|
//
|
|
// The ordinary version of this method expects the results to arrive promptly, and
|
|
// therefore use an HTTP client with a timeout. This client is not suitable
|
|
// for streaming use.
|
|
func doHTTPRequestForStreaming(req *http.Request) (*http.Response, error) {
|
|
var err error
|
|
var res *http.Response
|
|
|
|
// we need a client that ignores certificate errors, since we self-sign
|
|
// our certs
|
|
if clientForStreaming == nil && req.URL.Scheme == "https" {
|
|
var tr *http.Transport
|
|
|
|
if skipVerify {
|
|
tr = &http.Transport{
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
|
}
|
|
} else {
|
|
// Handle cases with cert
|
|
|
|
cfg, err := ClientConfigForX509(certFile, keyFile, rootFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tr = &http.Transport{
|
|
TLSClientConfig: cfg,
|
|
}
|
|
}
|
|
|
|
clientForStreaming = &http.Client{Transport: tr, Timeout: 0}
|
|
|
|
} else if clientForStreaming == nil {
|
|
clientForStreaming = HTTPClientForStreaming
|
|
}
|
|
|
|
for i := 0; i < HTTP_MAX_RETRY; i++ {
|
|
res, err = clientForStreaming.Do(req)
|
|
if err != nil && isHttpConnError(err) {
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
|
|
func doHTTPRequest(req *http.Request) (*http.Response, error) {
|
|
|
|
var err error
|
|
var res *http.Response
|
|
|
|
// we need a client that ignores certificate errors, since we self-sign
|
|
// our certs
|
|
if client == nil && req.URL.Scheme == "https" {
|
|
var tr *http.Transport
|
|
|
|
if skipVerify {
|
|
tr = &http.Transport{
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
|
}
|
|
} else {
|
|
// Handle cases with cert
|
|
|
|
cfg, err := ClientConfigForX509(certFile, keyFile, rootFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tr = &http.Transport{
|
|
TLSClientConfig: cfg,
|
|
}
|
|
}
|
|
|
|
client = &http.Client{Transport: tr}
|
|
|
|
} else if client == nil {
|
|
client = HTTPClient
|
|
}
|
|
|
|
for i := 0; i < HTTP_MAX_RETRY; i++ {
|
|
res, err = client.Do(req)
|
|
if err != nil && isHttpConnError(err) {
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
|
|
func doPutAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}, terse bool) error {
|
|
return doOutputAPI("PUT", baseURL, path, params, authHandler, out, terse)
|
|
}
|
|
|
|
func doPostAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}, terse bool) error {
|
|
return doOutputAPI("POST", baseURL, path, params, authHandler, out, terse)
|
|
}
|
|
|
|
func doDeleteAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}, terse bool) error {
|
|
return doOutputAPI("DELETE", baseURL, path, params, authHandler, out, terse)
|
|
}
|
|
|
|
func doOutputAPI(
|
|
httpVerb string,
|
|
baseURL *url.URL,
|
|
path string,
|
|
params map[string]interface{},
|
|
authHandler AuthHandler,
|
|
out interface{},
|
|
terse bool) error {
|
|
|
|
var requestUrl string
|
|
|
|
if q := strings.Index(path, "?"); q > 0 {
|
|
requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
|
|
} else {
|
|
requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
|
|
}
|
|
|
|
postData := url.Values{}
|
|
for k, v := range params {
|
|
postData.Set(k, fmt.Sprintf("%v", v))
|
|
}
|
|
|
|
req, err := http.NewRequest(httpVerb, requestUrl, bytes.NewBufferString(postData.Encode()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
|
|
|
err = maybeAddAuth(req, authHandler)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
res, err := doHTTPRequest(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
// 200 - ok, 202 - accepted (asynchronously)
|
|
if res.StatusCode != 200 && res.StatusCode != 202 {
|
|
bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
|
|
if terse {
|
|
var outBuf interface{}
|
|
|
|
err := json.Unmarshal(bod, &outBuf)
|
|
if err == nil && outBuf != nil {
|
|
switch errText := outBuf.(type) {
|
|
case string:
|
|
return fmt.Errorf("%s", errText)
|
|
case map[string]interface{}:
|
|
errField := errText["errors"]
|
|
if errField != nil {
|
|
|
|
// remove annoying 'map' prefix
|
|
return fmt.Errorf("%s", strings.TrimPrefix(fmt.Sprintf("%v", errField), "map"))
|
|
}
|
|
}
|
|
}
|
|
return fmt.Errorf("%s", string(bod))
|
|
}
|
|
return fmt.Errorf("HTTP error %v getting %q: %s",
|
|
res.Status, requestUrl, bod)
|
|
}
|
|
|
|
d := json.NewDecoder(res.Body)
|
|
// PUT/POST/DELETE request may not have a response body
|
|
if d.More() {
|
|
if err = d.Decode(&out); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func queryRestAPI(
|
|
baseURL *url.URL,
|
|
path string,
|
|
authHandler AuthHandler,
|
|
out interface{},
|
|
terse bool) error {
|
|
|
|
var requestUrl string
|
|
|
|
if q := strings.Index(path, "?"); q > 0 {
|
|
requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
|
|
} else {
|
|
requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
|
|
}
|
|
|
|
req, err := http.NewRequest("GET", requestUrl, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = maybeAddAuth(req, authHandler)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
res, err := doHTTPRequest(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
if res.StatusCode != 200 {
|
|
bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
|
|
if terse {
|
|
var outBuf interface{}
|
|
|
|
err := json.Unmarshal(bod, &outBuf)
|
|
if err == nil && outBuf != nil {
|
|
errText, ok := outBuf.(string)
|
|
if ok {
|
|
return fmt.Errorf(errText)
|
|
}
|
|
}
|
|
return fmt.Errorf(string(bod))
|
|
}
|
|
return fmt.Errorf("HTTP error %v getting %q: %s",
|
|
res.Status, requestUrl, bod)
|
|
}
|
|
|
|
d := json.NewDecoder(res.Body)
|
|
// GET request should have a response body
|
|
if err = d.Decode(&out); err != nil {
|
|
return fmt.Errorf("json decode err: %#v, for requestUrl: %s",
|
|
err, requestUrl)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error {
|
|
return c.processStream(c.BaseURL, path, c.ah, callb, data)
|
|
}
|
|
|
|
// Based on code in http://src.couchbase.org/source/xref/trunk/goproj/src/github.com/couchbase/indexing/secondary/dcp/pools.go#309
|
|
func (c *Client) processStream(baseURL *url.URL, path string, authHandler AuthHandler, callb func(interface{}) error, data interface{}) error {
|
|
var requestUrl string
|
|
|
|
if q := strings.Index(path, "?"); q > 0 {
|
|
requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
|
|
} else {
|
|
requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
|
|
}
|
|
|
|
req, err := http.NewRequest("GET", requestUrl, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = maybeAddAuth(req, authHandler)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
res, err := doHTTPRequestForStreaming(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
if res.StatusCode != 200 {
|
|
bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
|
|
return fmt.Errorf("HTTP error %v getting %q: %s",
|
|
res.Status, requestUrl, bod)
|
|
}
|
|
|
|
reader := bufio.NewReader(res.Body)
|
|
for {
|
|
bs, err := reader.ReadBytes('\n')
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(bs) == 1 && bs[0] == '\n' {
|
|
continue
|
|
}
|
|
|
|
err = json.Unmarshal(bs, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = callb(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
|
|
}
|
|
|
|
func (c *Client) parseURLResponse(path string, out interface{}) error {
|
|
return queryRestAPI(c.BaseURL, path, c.ah, out, false)
|
|
}
|
|
|
|
func (c *Client) parsePostURLResponse(path string, params map[string]interface{}, out interface{}) error {
|
|
return doPostAPI(c.BaseURL, path, params, c.ah, out, false)
|
|
}
|
|
|
|
func (c *Client) parsePostURLResponseTerse(path string, params map[string]interface{}, out interface{}) error {
|
|
return doPostAPI(c.BaseURL, path, params, c.ah, out, true)
|
|
}
|
|
|
|
func (c *Client) parseDeleteURLResponse(path string, params map[string]interface{}, out interface{}) error {
|
|
return doDeleteAPI(c.BaseURL, path, params, c.ah, out, false)
|
|
}
|
|
|
|
func (c *Client) parseDeleteURLResponseTerse(path string, params map[string]interface{}, out interface{}) error {
|
|
return doDeleteAPI(c.BaseURL, path, params, c.ah, out, true)
|
|
}
|
|
|
|
func (c *Client) parsePutURLResponse(path string, params map[string]interface{}, out interface{}) error {
|
|
return doPutAPI(c.BaseURL, path, params, c.ah, out, false)
|
|
}
|
|
|
|
func (c *Client) parsePutURLResponseTerse(path string, params map[string]interface{}, out interface{}) error {
|
|
return doPutAPI(c.BaseURL, path, params, c.ah, out, true)
|
|
}
|
|
|
|
func (b *Bucket) parseURLResponse(path string, out interface{}) error {
|
|
nodes := b.Nodes()
|
|
if len(nodes) == 0 {
|
|
return errors.New("no couch rest URLs")
|
|
}
|
|
|
|
// Pick a random node to start querying.
|
|
startNode := rand.Intn(len(nodes))
|
|
maxRetries := len(nodes)
|
|
for i := 0; i < maxRetries; i++ {
|
|
node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
|
|
// Skip non-healthy nodes.
|
|
if node.Status != "healthy" || node.CouchAPIBase == "" {
|
|
continue
|
|
}
|
|
url := &url.URL{
|
|
Host: node.Hostname,
|
|
Scheme: "http",
|
|
}
|
|
|
|
// Lock here to avoid having pool closed under us.
|
|
b.RLock()
|
|
err := queryRestAPI(url, path, b.pool.client.ah, out, false)
|
|
b.RUnlock()
|
|
if err == nil {
|
|
return err
|
|
}
|
|
}
|
|
return errors.New("All nodes failed to respond or no healthy nodes for bucket found")
|
|
}
|
|
|
|
func (b *Bucket) parseAPIResponse(path string, out interface{}) error {
|
|
nodes := b.Nodes()
|
|
if len(nodes) == 0 {
|
|
return errors.New("no couch rest URLs")
|
|
}
|
|
|
|
var err error
|
|
var u *url.URL
|
|
|
|
// Pick a random node to start querying.
|
|
startNode := rand.Intn(len(nodes))
|
|
maxRetries := len(nodes)
|
|
for i := 0; i < maxRetries; i++ {
|
|
node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
|
|
// Skip non-healthy nodes.
|
|
if node.Status != "healthy" || node.CouchAPIBase == "" {
|
|
continue
|
|
}
|
|
|
|
u, err = ParseURL(node.CouchAPIBase)
|
|
// Lock here so pool does not get closed under us.
|
|
b.RLock()
|
|
if err != nil {
|
|
b.RUnlock()
|
|
return fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
|
|
b.Name, i, node.CouchAPIBase, err)
|
|
} else if b.pool != nil {
|
|
u.User = b.pool.client.BaseURL.User
|
|
}
|
|
u.Path = path
|
|
|
|
// generate the path so that the strings are properly escaped
|
|
// MB-13770
|
|
requestPath := strings.Split(u.String(), u.Host)[1]
|
|
|
|
err = queryRestAPI(u, requestPath, b.pool.client.ah, out, false)
|
|
b.RUnlock()
|
|
if err == nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var errStr string
|
|
if err != nil {
|
|
errStr = "Error " + err.Error()
|
|
}
|
|
|
|
return errors.New("All nodes failed to respond or returned error or no healthy nodes for bucket found." + errStr)
|
|
}
|
|
|
|
type basicAuth struct {
|
|
u, p string
|
|
}
|
|
|
|
func (b basicAuth) GetCredentials() (string, string, string) {
|
|
return b.u, b.p, b.u
|
|
}
|
|
|
|
func basicAuthFromURL(us string) (ah AuthHandler) {
|
|
u, err := ParseURL(us)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if user := u.User; user != nil {
|
|
pw, _ := user.Password()
|
|
ah = basicAuth{user.Username(), pw}
|
|
}
|
|
return
|
|
}
|
|
|
|
// ConnectWithAuth connects to a couchbase cluster with the given
|
|
// authentication handler.
|
|
func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error) {
|
|
c.BaseURL, err = ParseURL(baseU)
|
|
if err != nil {
|
|
return
|
|
}
|
|
c.ah = ah
|
|
|
|
return c, c.parseURLResponse("/pools", &c.Info)
|
|
}
|
|
|
|
// Call this method with a TLS certificate file name to make communication
|
|
// with the KV engine encrypted.
|
|
//
|
|
// This method should be called immediately after a Connect*() method.
|
|
func (c *Client) InitTLS(certFile string) error {
|
|
serverCert, err := ioutil.ReadFile(certFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
CA_Pool := x509.NewCertPool()
|
|
CA_Pool.AppendCertsFromPEM(serverCert)
|
|
c.tlsConfig = &tls.Config{RootCAs: CA_Pool}
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) ClearTLS() {
|
|
c.tlsConfig = nil
|
|
}
|
|
|
|
// ConnectWithAuthCreds connects to a couchbase cluster with the give
|
|
// authorization creds returned by cb_auth
|
|
func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error) {
|
|
c.BaseURL, err = ParseURL(baseU)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
c.ah = newBucketAuth(username, password, "")
|
|
return c, c.parseURLResponse("/pools", &c.Info)
|
|
}
|
|
|
|
// Connect to a couchbase cluster. An authentication handler will be
|
|
// created from the userinfo in the URL if provided.
|
|
func Connect(baseU string) (Client, error) {
|
|
return ConnectWithAuth(baseU, basicAuthFromURL(baseU))
|
|
}
|
|
|
|
type BucketInfo struct {
|
|
Name string // name of bucket
|
|
Password string // SASL password of bucket
|
|
}
|
|
|
|
//Get SASL buckets
|
|
func GetBucketList(baseU string) (bInfo []BucketInfo, err error) {
|
|
|
|
c := &Client{}
|
|
c.BaseURL, err = ParseURL(baseU)
|
|
if err != nil {
|
|
return
|
|
}
|
|
c.ah = basicAuthFromURL(baseU)
|
|
|
|
var buckets []Bucket
|
|
err = c.parseURLResponse("/pools/default/buckets", &buckets)
|
|
if err != nil {
|
|
return
|
|
}
|
|
bInfo = make([]BucketInfo, 0)
|
|
for _, bucket := range buckets {
|
|
bucketInfo := BucketInfo{Name: bucket.Name, Password: bucket.Password}
|
|
bInfo = append(bInfo, bucketInfo)
|
|
}
|
|
return bInfo, err
|
|
}
|
|
|
|
//Set viewUpdateDaemonOptions
|
|
func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error) {
|
|
|
|
c := &Client{}
|
|
c.BaseURL, err = ParseURL(baseU)
|
|
if err != nil {
|
|
return
|
|
}
|
|
c.ah = basicAuthFromURL(baseU)
|
|
|
|
if len(params) < 1 {
|
|
return nil, fmt.Errorf("No params to set")
|
|
}
|
|
|
|
err = c.parsePostURLResponse("/settings/viewUpdateDaemon", params, &viewOpts)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return viewOpts, err
|
|
}
|
|
|
|
// This API lets the caller know, if the list of nodes a bucket is
|
|
// connected to has gone through an edit (a rebalance operation)
|
|
// since the last update to the bucket, in which case a Refresh is
|
|
// advised.
|
|
func (b *Bucket) NodeListChanged() bool {
|
|
b.RLock()
|
|
pool := b.pool
|
|
uri := b.URI
|
|
b.RUnlock()
|
|
|
|
tmpb := &Bucket{}
|
|
err := pool.client.parseURLResponse(uri, tmpb)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
|
|
bNodes := *(*[]Node)(b.nodeList)
|
|
if len(bNodes) != len(tmpb.NodesJSON) {
|
|
return true
|
|
}
|
|
|
|
bucketHostnames := map[string]bool{}
|
|
for _, node := range bNodes {
|
|
bucketHostnames[node.Hostname] = true
|
|
}
|
|
|
|
for _, node := range tmpb.NodesJSON {
|
|
if _, found := bucketHostnames[node.Hostname]; !found {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Sample data for scopes and collections as returned from the
|
|
// /pooles/default/$BUCKET_NAME/collections API.
|
|
// {"myScope2":{"myCollectionC":{}},"myScope1":{"myCollectionB":{},"myCollectionA":{}},"_default":{"_default":{}}}
|
|
|
|
// Structures for parsing collections manifest.
|
|
// The map key is the name of the scope.
|
|
// Example data:
|
|
// {"uid":"b","scopes":[
|
|
// {"name":"_default","uid":"0","collections":[
|
|
// {"name":"_default","uid":"0"}]},
|
|
// {"name":"myScope1","uid":"8","collections":[
|
|
// {"name":"myCollectionB","uid":"c"},
|
|
// {"name":"myCollectionA","uid":"b"}]},
|
|
// {"name":"myScope2","uid":"9","collections":[
|
|
// {"name":"myCollectionC","uid":"d"}]}]}
|
|
type InputManifest struct {
|
|
Uid string
|
|
Scopes []InputScope
|
|
}
|
|
type InputScope struct {
|
|
Name string
|
|
Uid string
|
|
Collections []InputCollection
|
|
}
|
|
type InputCollection struct {
|
|
Name string
|
|
Uid string
|
|
}
|
|
|
|
// Structures for storing collections information.
|
|
type Manifest struct {
|
|
Uid uint64
|
|
Scopes map[string]*Scope // map by name
|
|
}
|
|
type Scope struct {
|
|
Name string
|
|
Uid uint64
|
|
Collections map[string]*Collection // map by name
|
|
}
|
|
type Collection struct {
|
|
Name string
|
|
Uid uint64
|
|
}
|
|
|
|
var _EMPTY_MANIFEST *Manifest = &Manifest{Uid: 0, Scopes: map[string]*Scope{}}
|
|
|
|
func parseCollectionsManifest(res *gomemcached.MCResponse) (*Manifest, error) {
|
|
if !EnableCollections {
|
|
return _EMPTY_MANIFEST, nil
|
|
}
|
|
|
|
var im InputManifest
|
|
err := json.Unmarshal(res.Body, &im)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
uid, err := strconv.ParseUint(im.Uid, 16, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
mani := &Manifest{Uid: uid, Scopes: make(map[string]*Scope, len(im.Scopes))}
|
|
for _, iscope := range im.Scopes {
|
|
scope_uid, err := strconv.ParseUint(iscope.Uid, 16, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
scope := &Scope{Uid: scope_uid, Name: iscope.Name, Collections: make(map[string]*Collection, len(iscope.Collections))}
|
|
mani.Scopes[iscope.Name] = scope
|
|
for _, icoll := range iscope.Collections {
|
|
coll_uid, err := strconv.ParseUint(icoll.Uid, 16, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
coll := &Collection{Uid: coll_uid, Name: icoll.Name}
|
|
scope.Collections[icoll.Name] = coll
|
|
}
|
|
}
|
|
|
|
return mani, nil
|
|
}
|
|
|
|
// This function assumes the bucket is locked.
|
|
func (b *Bucket) GetCollectionsManifest() (*Manifest, error) {
|
|
// Collections not used?
|
|
if !EnableCollections {
|
|
return nil, fmt.Errorf("Collections not enabled.")
|
|
}
|
|
|
|
b.RLock()
|
|
pools := b.getConnPools(true /* already locked */)
|
|
pool := pools[0] // Any pool will do, so use the first one.
|
|
b.RUnlock()
|
|
client, err := pool.Get()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
|
|
}
|
|
client.SetDeadline(getDeadline(time.Time{}, DefaultTimeout))
|
|
|
|
// We need to select the bucket before GetCollectionsManifest()
|
|
// will work. This is sometimes done at startup (see defaultMkConn())
|
|
// but not always, depending on the auth type.
|
|
// Doing this is safe because we collect the the connections
|
|
// by bucket, so the bucket being selected will never change.
|
|
_, err = client.SelectBucket(b.Name)
|
|
if err != nil {
|
|
pool.Return(client)
|
|
return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.", err, b.Name, b.Name)
|
|
}
|
|
|
|
res, err := client.GetCollectionsManifest()
|
|
if err != nil {
|
|
pool.Return(client)
|
|
return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
|
|
}
|
|
mani, err := parseCollectionsManifest(res)
|
|
if err != nil {
|
|
pool.Return(client)
|
|
return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.", err, b.Name)
|
|
}
|
|
|
|
pool.Return(client)
|
|
return mani, nil
|
|
}
|
|
|
|
func (b *Bucket) RefreshFully() error {
|
|
return b.refresh(false)
|
|
}
|
|
|
|
func (b *Bucket) Refresh() error {
|
|
return b.refresh(true)
|
|
}
|
|
|
|
func (b *Bucket) refresh(preserveConnections bool) error {
|
|
b.RLock()
|
|
pool := b.pool
|
|
uri := b.URI
|
|
client := pool.client
|
|
b.RUnlock()
|
|
|
|
var poolServices PoolServices
|
|
var err error
|
|
if client.tlsConfig != nil {
|
|
poolServices, err = client.GetPoolServices("default")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
tmpb := &Bucket{}
|
|
err = pool.client.parseURLResponse(uri, tmpb)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pools := b.getConnPools(false /* bucket not already locked */)
|
|
|
|
// We need this lock to ensure that bucket refreshes happening because
|
|
// of NMVb errors received during bulkGet do not end up over-writing
|
|
// pool.inUse.
|
|
b.Lock()
|
|
|
|
for _, pool := range pools {
|
|
if pool != nil {
|
|
pool.inUse = false
|
|
}
|
|
}
|
|
|
|
newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
|
|
for i := range newcps {
|
|
hostport := tmpb.VBSMJson.ServerList[i]
|
|
if preserveConnections {
|
|
pool := b.getConnPoolByHost(hostport, true /* bucket already locked */)
|
|
if pool != nil && pool.inUse == false && (!pool.encrypted || pool.tlsConfig == client.tlsConfig) {
|
|
// if the hostname and index is unchanged then reuse this pool
|
|
newcps[i] = pool
|
|
pool.inUse = true
|
|
continue
|
|
}
|
|
}
|
|
|
|
var encrypted bool
|
|
if client.tlsConfig != nil {
|
|
hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices)
|
|
if err != nil {
|
|
b.Unlock()
|
|
return err
|
|
}
|
|
}
|
|
|
|
if b.ah != nil {
|
|
newcps[i] = newConnectionPool(hostport,
|
|
b.ah, AsynchronousCloser, PoolSize, PoolOverflow, client.tlsConfig, b.Name, encrypted)
|
|
|
|
} else {
|
|
newcps[i] = newConnectionPool(hostport,
|
|
b.authHandler(true /* bucket already locked */),
|
|
AsynchronousCloser, PoolSize, PoolOverflow, client.tlsConfig, b.Name, encrypted)
|
|
}
|
|
}
|
|
b.replaceConnPools2(newcps, true /* bucket already locked */)
|
|
tmpb.ah = b.ah
|
|
b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
|
|
b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
|
|
|
|
b.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (p *Pool) refresh() (err error) {
|
|
p.BucketMap = make(map[string]*Bucket)
|
|
|
|
buckets := []Bucket{}
|
|
err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i, _ := range buckets {
|
|
b := new(Bucket)
|
|
*b = buckets[i]
|
|
b.pool = p
|
|
b.nodeList = unsafe.Pointer(&b.NodesJSON)
|
|
|
|
// MB-33185 this is merely defensive, just in case
|
|
// refresh() gets called on a perfectly node pool
|
|
ob, ok := p.BucketMap[b.Name]
|
|
if ok && ob.connPools != nil {
|
|
ob.Close()
|
|
}
|
|
b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList)))
|
|
p.BucketMap[b.Name] = b
|
|
runtime.SetFinalizer(b, bucketFinalizer)
|
|
}
|
|
buckets = nil
|
|
return nil
|
|
}
|
|
|
|
// GetPool gets a pool from within the couchbase cluster (usually
|
|
// "default").
|
|
func (c *Client) GetPool(name string) (p Pool, err error) {
|
|
var poolURI string
|
|
|
|
for _, p := range c.Info.Pools {
|
|
if p.Name == name {
|
|
poolURI = p.URI
|
|
break
|
|
}
|
|
}
|
|
if poolURI == "" {
|
|
return p, errors.New("No pool named " + name)
|
|
}
|
|
|
|
err = c.parseURLResponse(poolURI, &p)
|
|
if err != nil {
|
|
return p, err
|
|
}
|
|
|
|
p.client = c
|
|
|
|
err = p.refresh()
|
|
return
|
|
}
|
|
|
|
// GetPoolServices returns all the bucket-independent services in a pool.
|
|
// (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)
|
|
func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) {
|
|
var poolName string
|
|
for _, p := range c.Info.Pools {
|
|
if p.Name == name {
|
|
poolName = p.Name
|
|
}
|
|
}
|
|
if poolName == "" {
|
|
return ps, errors.New("No pool named " + name)
|
|
}
|
|
|
|
poolURI := "/pools/" + poolName + "/nodeServices"
|
|
err = c.parseURLResponse(poolURI, &ps)
|
|
|
|
return
|
|
}
|
|
|
|
func (b *Bucket) GetPoolServices(name string) (*PoolServices, error) {
|
|
b.RLock()
|
|
pool := b.pool
|
|
b.RUnlock()
|
|
|
|
ps, err := pool.client.GetPoolServices(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ps, nil
|
|
}
|
|
|
|
// Close marks this bucket as no longer needed, closing connections it
|
|
// may have open.
|
|
func (b *Bucket) Close() {
|
|
b.Lock()
|
|
defer b.Unlock()
|
|
if b.connPools != nil {
|
|
for _, c := range b.getConnPools(true /* already locked */) {
|
|
if c != nil {
|
|
c.Close()
|
|
}
|
|
}
|
|
b.connPools = nil
|
|
}
|
|
}
|
|
|
|
func bucketFinalizer(b *Bucket) {
|
|
if b.connPools != nil {
|
|
if !b.closed {
|
|
logging.Warnf("Finalizing a bucket with active connections.")
|
|
}
|
|
|
|
// MB-33185 do not leak connection pools
|
|
b.Close()
|
|
}
|
|
}
|
|
|
|
// GetBucket gets a bucket from within this pool.
|
|
func (p *Pool) GetBucket(name string) (*Bucket, error) {
|
|
rv, ok := p.BucketMap[name]
|
|
if !ok {
|
|
return nil, &BucketNotFoundError{bucket: name}
|
|
}
|
|
err := rv.Refresh()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
// GetBucket gets a bucket from within this pool.
|
|
func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) {
|
|
rv, ok := p.BucketMap[bucket]
|
|
if !ok {
|
|
return nil, &BucketNotFoundError{bucket: bucket}
|
|
}
|
|
rv.ah = newBucketAuth(username, password, bucket)
|
|
err := rv.Refresh()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return rv, nil
|
|
}
|
|
|
|
// GetPool gets the pool to which this bucket belongs.
|
|
func (b *Bucket) GetPool() *Pool {
|
|
b.RLock()
|
|
defer b.RUnlock()
|
|
ret := b.pool
|
|
return ret
|
|
}
|
|
|
|
// GetClient gets the client from which we got this pool.
|
|
func (p *Pool) GetClient() *Client {
|
|
return p.client
|
|
}
|
|
|
|
// Release bucket connections when the pool is no longer in use
|
|
func (p *Pool) Close() {
|
|
|
|
// MB-36186 make the bucket map inaccessible
|
|
bucketMap := p.BucketMap
|
|
p.BucketMap = nil
|
|
|
|
// fine to loop through the buckets unlocked
|
|
// locking happens at the bucket level
|
|
for b, _ := range bucketMap {
|
|
|
|
// MB-36186 make the bucket unreachable and avoid concurrent read/write map panics
|
|
bucket := bucketMap[b]
|
|
bucketMap[b] = nil
|
|
|
|
bucket.Lock()
|
|
|
|
// MB-33208 defer closing connection pools until the bucket is no longer used
|
|
// MB-36186 if the bucket is unused make it unreachable straight away
|
|
needClose := bucket.connPools == nil && !bucket.closed
|
|
if needClose {
|
|
runtime.SetFinalizer(&bucket, nil)
|
|
}
|
|
bucket.closed = true
|
|
bucket.Unlock()
|
|
if needClose {
|
|
bucket.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetBucket is a convenience function for getting a named bucket from
|
|
// a URL
|
|
func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) {
|
|
var err error
|
|
client, err := Connect(endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pool, err := client.GetPool(poolname)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return pool.GetBucket(bucketname)
|
|
}
|
|
|
|
// ConnectWithAuthAndGetBucket is a convenience function for
|
|
// getting a named bucket from a given URL and an auth callback
|
|
func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string,
|
|
ah AuthHandler) (*Bucket, error) {
|
|
client, err := ConnectWithAuth(endpoint, ah)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pool, err := client.GetPool(poolname)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return pool.GetBucket(bucketname)
|
|
}
|
|
|
|
func GetSystemBucket(c *Client, p *Pool, name string) (*Bucket, error) {
|
|
bucket, err := p.GetBucket(name)
|
|
if err != nil {
|
|
if _, ok := err.(*BucketNotFoundError); !ok {
|
|
return nil, err
|
|
}
|
|
|
|
// create the bucket if not found
|
|
args := map[string]interface{}{
|
|
"authType": "sasl",
|
|
"bucketType": "couchbase",
|
|
"name": name,
|
|
"ramQuotaMB": 100,
|
|
"saslPassword": "donotuse",
|
|
}
|
|
var ret interface{}
|
|
// allow "bucket already exists" error in case duplicate create
|
|
// (e.g. two query nodes starting at same time)
|
|
err = c.parsePostURLResponseTerse("/pools/default/buckets", args, &ret)
|
|
if err != nil && !AlreadyExistsError(err) {
|
|
return nil, err
|
|
}
|
|
|
|
// bucket created asynchronously, try to get the bucket
|
|
maxRetry := 8
|
|
interval := 100 * time.Millisecond
|
|
for i := 0; i < maxRetry; i++ {
|
|
time.Sleep(interval)
|
|
interval *= 2
|
|
err = p.refresh()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bucket, err = p.GetBucket(name)
|
|
if bucket != nil {
|
|
bucket.RLock()
|
|
ok := !bucket.closed && len(bucket.getConnPools(true /* already locked */)) > 0
|
|
bucket.RUnlock()
|
|
if ok {
|
|
break
|
|
}
|
|
} else if err != nil {
|
|
if _, ok := err.(*BucketNotFoundError); !ok {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return bucket, err
|
|
}
|
|
|
|
func DropSystemBucket(c *Client, name string) error {
|
|
err := c.parseDeleteURLResponseTerse("/pools/default/buckets/"+name, nil, nil)
|
|
return err
|
|
}
|
|
|
|
func AlreadyExistsError(err error) bool {
|
|
// Bucket error: Bucket with given name already exists
|
|
// Scope error: Scope with this name already exists
|
|
// Collection error: Collection with this name already exists
|
|
return strings.Contains(err.Error(), " name already exists")
|
|
}
|