k3s/vendor/google.golang.org/grpc/resolver_conn_wrapper.go

178 lines
5.6 KiB
Go
Raw Normal View History

2019-01-12 04:58:27 +00:00
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"fmt"
"strings"
2020-01-21 18:14:47 +00:00
"sync"
2019-01-12 04:58:27 +00:00
2020-01-21 18:14:47 +00:00
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials"
2019-09-27 21:51:53 +00:00
"google.golang.org/grpc/internal/channelz"
2020-01-21 18:14:47 +00:00
"google.golang.org/grpc/internal/grpcsync"
2019-01-12 04:58:27 +00:00
"google.golang.org/grpc/resolver"
2020-01-21 18:14:47 +00:00
"google.golang.org/grpc/serviceconfig"
2019-01-12 04:58:27 +00:00
)
// ccResolverWrapper is a wrapper on top of cc for resolvers.
// It implements resolver.ClientConn interface.
2019-01-12 04:58:27 +00:00
type ccResolverWrapper struct {
2020-01-21 18:14:47 +00:00
cc *ClientConn
resolverMu sync.Mutex
resolver resolver.Resolver
done *grpcsync.Event
curState resolver.State
2019-01-12 04:58:27 +00:00
}
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
2019-01-12 04:58:27 +00:00
ccr := &ccResolverWrapper{
2020-01-21 18:14:47 +00:00
cc: cc,
done: grpcsync.NewEvent(),
}
var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
2020-03-26 21:07:15 +00:00
rbo := resolver.BuildOptions{
2020-01-21 18:14:47 +00:00
DisableServiceConfig: cc.dopts.disableServiceConfig,
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
2019-01-12 04:58:27 +00:00
}
var err error
2020-01-21 18:14:47 +00:00
// We need to hold the lock here while we assign to the ccr.resolver field
// to guard against a data race caused by the following code path,
// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
// accessing ccr.resolver which is being assigned here.
ccr.resolverMu.Lock()
2020-03-26 21:07:15 +00:00
defer ccr.resolverMu.Unlock()
2020-01-21 18:14:47 +00:00
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
2019-01-12 04:58:27 +00:00
if err != nil {
return nil, err
}
return ccr, nil
}
2020-03-26 21:07:15 +00:00
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
2020-01-21 18:14:47 +00:00
ccr.resolverMu.Lock()
if !ccr.done.HasFired() {
ccr.resolver.ResolveNow(o)
}
ccr.resolverMu.Unlock()
2019-09-05 18:55:53 +00:00
}
2019-09-25 05:00:42 +00:00
func (ccr *ccResolverWrapper) close() {
2020-01-21 18:14:47 +00:00
ccr.resolverMu.Lock()
2019-09-25 05:00:42 +00:00
ccr.resolver.Close()
2020-01-21 18:14:47 +00:00
ccr.done.Fire()
ccr.resolverMu.Unlock()
2019-01-12 04:58:27 +00:00
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
2020-01-21 18:14:47 +00:00
if ccr.done.HasFired() {
return nil
2019-09-27 21:51:53 +00:00
}
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
2019-09-27 21:51:53 +00:00
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
ccr.curState = s
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
}
return nil
2020-01-21 18:14:47 +00:00
}
func (ccr *ccResolverWrapper) ReportError(err error) {
if ccr.done.HasFired() {
return
}
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
ccr.cc.updateResolverState(resolver.State{}, err)
2019-09-27 21:51:53 +00:00
}
// NewAddress is called by the resolver implementation to send addresses to gRPC.
2019-01-12 04:58:27 +00:00
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
2020-01-21 18:14:47 +00:00
if ccr.done.HasFired() {
2019-09-27 21:51:53 +00:00
return
}
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs)
2019-09-27 21:51:53 +00:00
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
ccr.curState.Addresses = addrs
ccr.cc.updateResolverState(ccr.curState, nil)
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
// NewServiceConfig is called by the resolver implementation to send service
2019-09-05 18:55:53 +00:00
// configs to gRPC.
2019-01-12 04:58:27 +00:00
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
2020-01-21 18:14:47 +00:00
if ccr.done.HasFired() {
2019-09-27 21:51:53 +00:00
return
}
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc)
2020-03-26 21:07:15 +00:00
if ccr.cc.dopts.disableServiceConfig {
channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
2020-03-26 21:07:15 +00:00
return
}
2020-01-21 18:14:47 +00:00
scpr := parseServiceConfig(sc)
if scpr.Err != nil {
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
2019-09-27 21:51:53 +00:00
return
}
if channelz.IsOn() {
2020-01-21 18:14:47 +00:00
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
2019-09-27 21:51:53 +00:00
}
2020-01-21 18:14:47 +00:00
ccr.curState.ServiceConfig = scpr
ccr.cc.updateResolverState(ccr.curState, nil)
2020-01-21 18:14:47 +00:00
}
func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
return parseServiceConfig(scJSON)
2019-09-27 21:51:53 +00:00
}
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
var updates []string
2020-01-21 18:14:47 +00:00
var oldSC, newSC *ServiceConfig
var oldOK, newOK bool
if ccr.curState.ServiceConfig != nil {
oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
}
if s.ServiceConfig != nil {
newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
}
2019-09-27 21:51:53 +00:00
if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
updates = append(updates, "service config updated")
}
if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
updates = append(updates, "resolver returned an empty address list")
} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
updates = append(updates, "resolver returned new addresses")
2019-01-12 04:58:27 +00:00
}
channelz.AddTraceEvent(logger, ccr.cc.channelzID, 0, &channelz.TraceEventDesc{
2019-09-27 21:51:53 +00:00
Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
Severity: channelz.CtInfo,
2019-09-27 21:51:53 +00:00
})
2019-01-12 04:58:27 +00:00
}