Skip to content

Commit

Permalink
Engine output handling (#3339)
Browse files Browse the repository at this point in the history
* Engine output handling

* Add buffering of output

* Tests update
  • Loading branch information
denis256 committed Aug 13, 2024
1 parent c818ea4 commit e0c5cbe
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 22 deletions.
64 changes: 42 additions & 22 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,38 +448,33 @@ func invoke(ctx context.Context, runOptions *ExecutionOptions, client *proto.Eng
return nil, errors.WithStackTrace(err)
}

cmdStdout := runOptions.CmdStdout
cmdStderr := runOptions.CmdStderr
var stdoutBuf, stderrBuf bytes.Buffer
stdout := io.MultiWriter(runOptions.CmdStdout, &stdoutBuf)
stderr := io.MultiWriter(runOptions.CmdStderr, &stderrBuf)

var stdoutBuf bytes.Buffer
var stderrBuf bytes.Buffer
var stdoutLineBuf, stderrLineBuf bytes.Buffer
var resultCode int

stdout := io.MultiWriter(cmdStdout, &stdoutBuf)
stderr := io.MultiWriter(cmdStderr, &stderrBuf)
// read stdout and stderr from engine
var resultCode = 0
for {
runResp, err := response.Recv()
if err != nil {
break
}
if runResp == nil {
if err != nil || runResp == nil {
break
}
if runResp.GetStdout() != "" {
_, err := stdout.Write([]byte(runResp.GetStdout()))
if err != nil {
return nil, errors.WithStackTrace(err)
}
if err := processStream(runResp.GetStdout(), &stdoutLineBuf, stdout); err != nil {
return nil, errors.WithStackTrace(err)
}
if runResp.GetStderr() != "" {
_, err := stderr.Write([]byte(runResp.GetStderr()))
if err != nil {
return nil, errors.WithStackTrace(err)
}
if err := processStream(runResp.GetStderr(), &stderrLineBuf, stderr); err != nil {
return nil, errors.WithStackTrace(err)
}
resultCode = int(runResp.GetResultCode())
}
if err := flushBuffer(&stdoutLineBuf, stdout); err != nil {
return nil, errors.WithStackTrace(err)
}
if err := flushBuffer(&stderrLineBuf, stderr); err != nil {
return nil, errors.WithStackTrace(err)
}

terragruntOptions.Logger.Debugf("Engine execution done in %v", terragruntOptions.WorkingDir)

if resultCode != 0 {
Expand All @@ -500,6 +495,31 @@ func invoke(ctx context.Context, runOptions *ExecutionOptions, client *proto.Eng
return &cmdOutput, nil
}

// processStream handles the character buffering and line printing for a given stream
func processStream(data string, lineBuf *bytes.Buffer, output io.Writer) error {
for _, ch := range data {
lineBuf.WriteByte(byte(ch))
if ch == '\n' {
if _, err := fmt.Fprint(output, lineBuf.String()); err != nil {
return errors.WithStackTrace(err)
}
lineBuf.Reset()
}
}
return nil
}

// flushBuffer prints any remaining data in the buffer
func flushBuffer(lineBuf *bytes.Buffer, output io.Writer) error {
if lineBuf.Len() > 0 {
_, err := fmt.Fprint(output, lineBuf.String())
if err != nil {
return errors.WithStackTrace(err)
}
}
return nil
}

// initialize engine for working directory
func initialize(ctx context.Context, runOptions *ExecutionOptions, client *proto.EngineClient) error {
terragruntOptions := runOptions.TerragruntOptions
Expand Down
2 changes: 2 additions & 0 deletions test/integration_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func TestEngineRunAllOpentofu(t *testing.T) {

assert.Contains(t, stderr, "starting plugin:")
assert.Contains(t, stderr, "plugin process exited:")
assert.Contains(t, stdout, "resource \"local_file\" \"test\"")
assert.Contains(t, stdout, "filename = \"./test.txt\"\n")
assert.Contains(t, stdout, "OpenTofu has been successfull")
assert.Contains(t, stdout, "Tofu Shutdown completed")
assert.Contains(t, stdout, "Apply complete!")
Expand Down

0 comments on commit e0c5cbe

Please sign in to comment.