diff --git a/go-client/pegasus/table_connector.go b/go-client/pegasus/table_connector.go index d1074a7bf3..c83461e393 100644 --- a/go-client/pegasus/table_connector.go +++ b/go-client/pegasus/table_connector.go @@ -703,6 +703,7 @@ func (p *pegasusTableConnector) handleReplicaError(err error, replica *session.R case base.ERR_TIMEOUT: case context.DeadlineExceeded: + confUpdate = true case context.Canceled: // timeout will not trigger a configuration update diff --git a/go-client/pegasus/table_connector_test.go b/go-client/pegasus/table_connector_test.go index 1b28747655..b4016748ea 100644 --- a/go-client/pegasus/table_connector_test.go +++ b/go-client/pegasus/table_connector_test.go @@ -269,8 +269,14 @@ func TestPegasusTableConnector_TriggerSelfUpdate(t *testing.T) { assert.True(t, confUpdate) assert.False(t, retry) + confUpdate, retry, err = ptb.handleReplicaError(context.DeadlineExceeded, nil) + <-ptb.confUpdateCh + assert.Error(t, err) + assert.True(t, confUpdate) + assert.False(t, retry) + { // Ensure: The following errors should not trigger configuration update - errorTypes := []error{base.ERR_TIMEOUT, context.DeadlineExceeded, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT} + errorTypes := []error{base.ERR_TIMEOUT, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT} for _, err := range errorTypes { channelEmpty := false diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go index 2db6179ab1..d846aa09b8 100644 --- a/go-client/session/meta_call.go +++ b/go-client/session/meta_call.go @@ -26,6 +26,8 @@ import ( "time" "github.com/apache/incubator-pegasus/go-client/idl/base" + "github.com/apache/incubator-pegasus/go-client/idl/replication" + "github.com/apache/incubator-pegasus/go-client/pegalog" ) type metaCallFunc func(context.Context, *metaSession) (metaResponse, error) @@ -42,21 +44,24 @@ type metaCall struct { backupCh chan interface{} callFunc metaCallFunc - metas []*metaSession - lead int + metaIPAddrs []string + metas []*metaSession + lead int // After a Run successfully ends, the current leader will be set in this field. // If there is no meta failover, `newLead` equals to `lead`. newLead uint32 + lock sync.RWMutex } -func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc) *metaCall { +func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc, meatIPAddr []string) *metaCall { return &metaCall{ - metas: metas, - lead: lead, - newLead: uint32(lead), - respCh: make(chan metaResponse), - callFunc: callFunc, - backupCh: make(chan interface{}), + metas: metas, + metaIPAddrs: meatIPAddr, + lead: lead, + newLead: uint32(lead), + respCh: make(chan metaResponse), + callFunc: callFunc, + backupCh: make(chan interface{}), } } @@ -106,14 +111,44 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse, error) { } // issueSingleMeta returns false if we should try another meta -func (c *metaCall) issueSingleMeta(ctx context.Context, i int) bool { - meta := c.metas[i] +func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool { + meta := c.metas[curLeader] resp, err := c.callFunc(ctx, meta) + + if err == nil && resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() { + forwardAddr := c.getMetaServiceForwardAddress(resp) + if forwardAddr == nil { + return false + } + addr := forwardAddr.GetAddress() + found := false + c.lock.Lock() + for i := range c.metaIPAddrs { + if addr == c.metaIPAddrs[i] { + found = true + break + } + } + c.lock.Unlock() + if !found { + c.lock.Lock() + c.metaIPAddrs = append(c.metaIPAddrs, addr) + c.metas = append(c.metas, &metaSession{ + NodeSession: newNodeSession(addr, NodeTypeMeta), + logger: pegalog.GetLogger(), + }) + c.lock.Unlock() + curLeader = len(c.metas) - 1 + c.metas[curLeader].logger.Printf("add forward address %s as meta server", addr) + resp, err = c.callFunc(ctx, c.metas[curLeader]) + } + } + if err != nil || resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() { return false } // the RPC succeeds, this meta becomes the new leader now. - atomic.StoreUint32(&c.newLead, uint32(i)) + atomic.StoreUint32(&c.newLead, uint32(curLeader)) select { case <-ctx.Done(): case c.respCh <- resp: @@ -133,3 +168,14 @@ func (c *metaCall) issueBackupMetas(ctx context.Context) { }(i) } } + +func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddress { + rep, ok := resp.(*replication.QueryCfgResponse) + if !ok || rep.GetErr().Errno != base.ERR_FORWARD_TO_OTHERS.String() { + return nil + } else if rep.GetPartitions() == nil || len(rep.GetPartitions()) == 0 { + return nil + } else { + return rep.Partitions[0].Primary + } +} diff --git a/go-client/session/meta_session.go b/go-client/session/meta_session.go index c209cb8488..b0e962d1d9 100644 --- a/go-client/session/meta_session.go +++ b/go-client/session/meta_session.go @@ -94,10 +94,12 @@ func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager { func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResponse, error) { lead := m.getCurrentLeader() - call := newMetaCall(lead, m.metas, callFunc) + call := newMetaCall(lead, m.metas, callFunc, m.metaIPAddrs) resp, err := call.Run(ctx) if err == nil { m.setCurrentLeader(int(call.newLead)) + m.setNewMetas(call.metas) + m.setMetaIPAddrs(call.metaIPAddrs) } return resp, err } @@ -131,6 +133,20 @@ func (m *MetaManager) setCurrentLeader(lead int) { m.currentLeader = lead } +func (m *MetaManager) setNewMetas(metas []*metaSession) { + m.mu.Lock() + defer m.mu.Unlock() + + m.metas = metas +} + +func (m *MetaManager) setMetaIPAddrs(metaIPAddrs []string) { + m.mu.Lock() + defer m.mu.Unlock() + + m.metaIPAddrs = metaIPAddrs +} + // Close the sessions. func (m *MetaManager) Close() error { funcs := make([]func() error, len(m.metas)) diff --git a/go-client/session/meta_session_test.go b/go-client/session/meta_session_test.go index d2cbf6cc3d..5014a4680e 100644 --- a/go-client/session/meta_session_test.go +++ b/go-client/session/meta_session_test.go @@ -118,7 +118,7 @@ func TestMetaManager_FirstMetaDead(t *testing.T) { for i := 0; i < 3; i++ { call := newMetaCall(mm.currentLeader, mm.metas, func(rpcCtx context.Context, ms *metaSession) (metaResponse, error) { return ms.queryConfig(rpcCtx, "temp") - }) + }, []string{"0.0.0.0:12345", "0.0.0.0:34603", "0.0.0.0:34602", "0.0.0.0:34601"}) // This a trick for testing. If metaCall issue to other meta, not only to the leader, this nil channel will cause panic. call.backupCh = nil metaResp, err := call.Run(context.Background()) @@ -126,3 +126,19 @@ func TestMetaManager_FirstMetaDead(t *testing.T) { assert.Equal(t, metaResp.GetErr().Errno, base.ERR_OK.String()) } } + +// This case mocks the case that the server primary meta is not in the client metalist. +// And the client will forward to the primary meta automatically. +func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) { + defer leaktest.Check(t)() + + metaList := []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"} + + for i := 0; i < 3; i++ { + mm := NewMetaManager(metaList[i:i+1], NewNodeSession) + defer mm.Close() + resp, err := mm.QueryConfig(context.Background(), "temp") + assert.Nil(t, err) + assert.Equal(t, resp.Err.Errno, base.ERR_OK.String()) + } +}