Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jsonrpc: stdio #1558

Merged
merged 1 commit into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions x/fakenet/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package fakenet

import (
"io"
"net"
"sync"
"time"
)

// NewConn returns a net.Conn built on top of the supplied reader and writer.
// It decouples the read and write on the conn from the underlying stream
// to enable Close to abort ones that are in progress.
// It's primary use is to fake a network connection from stdin and stdout.
func NewConn(name string, in io.ReadCloser, out io.WriteCloser) net.Conn {
c := &fakeConn{
name: name,
reader: newFeeder(in.Read),
writer: newFeeder(out.Write),
in: in,
out: out,
}
go c.reader.run()
go c.writer.run()
return c
}

type fakeConn struct {
name string
reader *connFeeder
writer *connFeeder
in io.ReadCloser
out io.WriteCloser
}

type fakeAddr string

// connFeeder serializes calls to the source function (io.Reader.Read or
// io.Writer.Write) by delegating them to a channel. This also allows calls to
// be intercepted when the connection is closed, and cancelled early if the
// connection is closed while the calls are still outstanding.
type connFeeder struct {
source func([]byte) (int, error)
input chan []byte
result chan feedResult
mu sync.Mutex
closed bool
done chan struct{}
}

type feedResult struct {
n int
err error
}

func (c *fakeConn) Close() error {
c.reader.close()
c.writer.close()
c.in.Close()
c.out.Close()
return nil
}

func (c *fakeConn) Read(b []byte) (n int, err error) { return c.reader.do(b) }
func (c *fakeConn) Write(b []byte) (n int, err error) { return c.writer.do(b) }
func (c *fakeConn) LocalAddr() net.Addr { return fakeAddr(c.name) }
func (c *fakeConn) RemoteAddr() net.Addr { return fakeAddr(c.name) }
func (c *fakeConn) SetDeadline(t time.Time) error { return nil }
func (c *fakeConn) SetReadDeadline(t time.Time) error { return nil }
func (c *fakeConn) SetWriteDeadline(t time.Time) error { return nil }
func (a fakeAddr) Network() string { return "fake" }
func (a fakeAddr) String() string { return string(a) }

func newFeeder(source func([]byte) (int, error)) *connFeeder {
return &connFeeder{
source: source,
input: make(chan []byte),
result: make(chan feedResult),
done: make(chan struct{}),
}
}

func (f *connFeeder) close() {
f.mu.Lock()
if !f.closed {
f.closed = true
close(f.done)
}
f.mu.Unlock()
}

func (f *connFeeder) do(b []byte) (n int, err error) {
// send the request to the worker
select {
case f.input <- b:
case <-f.done:
return 0, io.EOF
}
// get the result from the worker
select {
case r := <-f.result:
return r.n, r.err
case <-f.done:
return 0, io.EOF
}
}

func (f *connFeeder) run() {
var b []byte
for {
// wait for an input request
select {
case b = <-f.input:
case <-f.done:
return
}
// invoke the underlying method
n, err := f.source(b)
// send the result back to the requester
select {
case f.result <- feedResult{n: n, err: err}:
case <-f.done:
return
}
}
}
5 changes: 1 addition & 4 deletions x/jsonrpc2/jsonrpc2test/jsonrpc2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (
func TestNetPipe(t *testing.T) {
jsonrpc2.SetDebug(jsonrpc2.DbgFlagCall)
ctx := context.Background()
listener, err := jsonrpc2test.NetPipeListener(ctx)
if err != nil {
t.Fatal(err)
}
listener := jsonrpc2test.NetPipeListener()
cases.Test(t, ctx, listener, jsonrpc2.HeaderFramer(), true)
}
4 changes: 2 additions & 2 deletions x/jsonrpc2/jsonrpc2test/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
// It is only possibly to connect to it using the Dialer returned by the
// Dialer method, each call to that method will generate a new pipe the other
// side of which will be returned from the Accept call.
func NetPipeListener(ctx context.Context) (jsonrpc2.Listener, error) {
func NetPipeListener() jsonrpc2.Listener {
return &netPiper{
done: make(chan struct{}),
dialed: make(chan io.ReadWriteCloser),
}, nil
}
}

// netPiper is the implementation of Listener build on top of net.Pipes.
Expand Down
32 changes: 0 additions & 32 deletions x/jsonrpc2/stdio/jsonrpc2_test.go

This file was deleted.

138 changes: 10 additions & 128 deletions x/jsonrpc2/stdio/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,162 +18,44 @@ package stdio

import (
"context"
"errors"
"io"
"log"
"net"
"os"
"sync/atomic"
"time"

"github.com/goplus/gop/x/fakenet"
"github.com/goplus/gop/x/jsonrpc2"
"github.com/goplus/gop/x/jsonrpc2/jsonrpc2test"
)

// -----------------------------------------------------------------------------

var (
ErrTooManyConnections = errors.New("too many connections")
)

var (
connCnt int32
)

type serverConn struct {
closed bool
}

func (p *serverConn) Read(b []byte) (n int, err error) {
if p.closed {
return 0, net.ErrClosed
}
n, err = os.Stdin.Read(b)
for n == 0 { // retry (to support localDailer)
time.Sleep(time.Millisecond)
n, err = os.Stdin.Read(b)
}
return
}

func (p *serverConn) Write(b []byte) (n int, err error) {
if p.closed {
return 0, net.ErrClosed
}
return os.Stdout.Write(b)
}

func (p *serverConn) Close() error {
if jsonrpc2.Verbose {
log.Println("==> stdio.conn.Close")
}
if p.closed {
return net.ErrClosed
}
p.closed = true
atomic.AddInt32(&connCnt, -1)
return nil
}

// -----------------------------------------------------------------------------

type newConn struct {
in, out *os.File
r, w *os.File
oStdin *os.File
oStdout *os.File
closed bool
}

func (p *newConn) Read(b []byte) (n int, err error) {
if p.closed {
return 0, net.ErrClosed
}
return p.in.Read(b)
}

func (p *newConn) Write(b []byte) (n int, err error) {
if p.closed {
return 0, net.ErrClosed
}
return p.out.Write(b)
}

func (p *newConn) Close() error {
if jsonrpc2.Verbose {
log.Println("==> stdio.newConn.Close")
}
if p.closed {
return net.ErrClosed
}
p.closed = true
atomic.AddInt32(&dailCnt, -1)
os.Stdin, os.Stdout = p.oStdin, p.oStdout
p.in.Close()
p.w.Close()
p.r.Close()
p.out.Close()
return nil
}

// -----------------------------------------------------------------------------

type listener struct {
closed bool
localDail bool
}

func (p *listener) Close() error {
if jsonrpc2.Verbose {
log.Println("==> stdio.listener.Close")
}
if p.closed {
return net.ErrClosed
}
p.closed = true
return nil
}

// Accept blocks waiting for an incoming connection to the listener.
func (p *listener) Accept(context.Context) (io.ReadWriteCloser, error) {
if atomic.AddInt32(&connCnt, 1) != 1 {
atomic.AddInt32(&connCnt, -1)
connCnt := &connCnt[server]
if atomic.AddInt32(connCnt, 1) != 1 {
atomic.AddInt32(connCnt, -1)
return nil, ErrTooManyConnections
}
return new(serverConn), nil
}

// Dial returns a new communication byte stream to a listening server.
func (p *listener) Dial(ctx context.Context) (ret io.ReadWriteCloser, err error) {
if atomic.AddInt32(&dailCnt, 1) != 1 {
atomic.AddInt32(&dailCnt, -1)
return nil, ErrTooManyConnections
}
in, w, err := os.Pipe()
if err != nil {
return
}
r, out, err := os.Pipe()
if err != nil {
in.Close()
w.Close()
return
}
oStdin, oStdout := os.Stdin, os.Stdout
os.Stdin, os.Stdout = r, w
return &newConn{in: in, out: out, r: r, w: w, oStdin: oStdin, oStdout: oStdout}, nil
return fakenet.NewConn("stdio", os.Stdin, os.Stdout), nil
}

func (p *listener) Dialer() jsonrpc2.Dialer {
if p.localDail {
return p
}
return nil
}

// Listener returns a jsonrpc2.Listener based on stdin and stdout.
func Listener(allowLocalDail bool) jsonrpc2.Listener {
l := &listener{localDail: allowLocalDail}
return l
if allowLocalDail {
return jsonrpc2test.NetPipeListener()
}
return &listener{}
}

// -----------------------------------------------------------------------------
Loading