Document stream ownership and v2 early-close behavior#57
Conversation
There was a problem hiding this comment.
Pull request overview
This PR documents a behavioral difference between go-pipe v1 and v2 around early-consumer exit and producer-side pipe write errors, and adds/adjusts tests to validate the v2 behavior (including how IgnoreError(..., IsPipeError) can be used to safely ignore these errors for stateless producers).
Changes:
- Document v2’s more direct surfacing of producer-side pipe errors when downstream stages stop reading early, with guidance/patterns for stateful producers.
- Remove Windows-specific skips from several pipeline tests that shell out to Unix utilities, and drop the
runtimeimport. - Add new tests asserting (a) Go producers can observe pipe errors when a command exits without reading stdin and (b)
IgnoreError(IsPipeError)allows stateful producers to finish processing.
Show a summary per file
| File | Description |
|---|---|
| README.md | Adds v1→v2 migration note explaining early-close pipe errors and how to handle them. |
| pipe/pipeline_test.go | Updates pipeline tests (removes Windows skips) and adds new coverage for producer-side pipe errors + IgnoreError(IsPipeError) behavior. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 2/2 changed files
- Comments generated: 9
Replace Stage.Start close flags with InputStream and OutputStream types. The endpoint constructors encode whether a stream is closing while preserving the underlying reader/writer type. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add regression coverage for the go-pipe v2 behavior where a Go producer can see a pipe error directly when a downstream command exits without fully reading stdin. This documents the visible semantic change from v1 to v2. Also, we add an example that demonstrates correctly handling downstream early exit in a stateful producer.
8a9f705 to
0ca3ba9
Compare
Explain that go-pipe v1 could hide producer-side pipe write errors as an implementation detail of the command stdin-copy path, while v2 exposes those errors more directly. Document the migration patterns for stateless and stateful producers.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
0ca3ba9 to
433e104
Compare
mhagger
left a comment
There was a problem hiding this comment.
LGTM as another step towards v2 👍
This is to force all close calls to come through `InputStream.Close()` and `OutputStream.Close()`, which will soon get some more functionality.
This will be needed in a moment.
Ignore all but the first call to `Close()`.
In the type comments, explain why these types don't implement `io.Reader` and `io.Writer`. Otherwise, some helpful person is sure to come along and add `Read()` and `Write()` methods, to the detriment of performance and even changing some semantics.
Change the types of some `Pipeline` fields: * `stdin` to `InputStream` * `stdout` to `OutputStream` That way we don't have to manage their closers separately.
The two aspects of a stage's stream requirements are coupled. For example, of the four possible combinations of `(StreamRequirement, NeedsFile)`, one of them, `(StreamForbidden, true)`, makes no sense. So instead of encoding these two aspects separately, encode the three meaningful combinations into a single `StreamRequirement` type with possible values `StreamAcceptAny`, `StreamPreferFile`, and `StreamForbidden`.
The old code used a `stageStarter` type to help with starting up stages. There are two awkward things about that approach: * The pipe that is needed to join stages depends on the two adjacent stages, not on the stdin and stdout of a single stage. So `stageStarter` wasn't able to figure out what pipe was needed; that logic kindof needed to live in the `Pipeline` type. * Since the read and write ends of a pipe are created together, the `stageStarter` could also not be as helpful in closing those streams if there was an error. So instead, use a new helper type, `stageJoiner`, to figure out how to join two adjacent stages together. The `stageJoiner` can now do part of the work for us: * `stageJoiner.validate()` checks that the adjacent stages' requirements are met WRT whether they need stdin/stdout to be supplied at all. * `stageJoiner.createPipe()` figures out what kind of pipe to create for the two adjacent stages. * `stageJoiner.closePipe()` closes the pipe (if it has already been created and needs closing) on errors. This isn't a panacea; for example, some loops have to iterate over stages, and some over stage joiners. But I think that it's less confusing this way.
| if p.stdout == nil { | ||
| // No stages and no destination: there is nothing to do | ||
| // and nowhere to put `p.stdin` even if it was set. | ||
| return nil | ||
| } |
There was a problem hiding this comment.
In this case p.stderr might still need to be closed.
There was a problem hiding this comment.
@mhagger p here is a Pipeline. It doesn't have a stderr? Only stdin/stdout.
There was a problem hiding this comment.
Oops, I meant p.stdin. Even though p.stdout is nil, the user might have set a closing stdin, and it's our responsibility to close it in this error path. But I think that this has been fixed by the stageJoiner change.
There was a problem hiding this comment.
Yeah, I thought you might mean that, and came to the same conclusion. There was in fact no way previously to have a non-nil stdinCloser, and although it was an easy refactor to remove the field, the entire problem evaporated with the stageJoiner changes. 🎉
Move the part of the explanation that is specific to command stages to `command.go`.
needFilePipe() and validate() each call Stage.Requirements() for the adjacent stages, so a stage's requirements can be computed/allocated twice during startup. Caching the result on the joiner lets us call Requirements() only once.
Reorder Start() to run stageJoiner.validate() before createPipe(), and make validate() decide whether a stream is "connected" from the pipeline topology (the presence of an adjacent stage or an already-set input/output stream). This lets us reject an invalid pipeline without allocating any io.Pipe() objects, and in the case of os.Pipe(), doing the system calls to create OS-level file descriptors.
Make the stream types more capable and use them in more places
Use a `stageJoiner` to join stages together
|
Oops, this branch has itself been a collection point for other PRs (I didn't realize that it hasn't been merged yet). I think we should merge it into |
This branch builds on
znull/stage-v2-cleanby introducing explicit endpoint stream ownership types and documenting the behavior changes that fall out of the v2 pipeline wiring.It replaces boolean stdin/stdout close flags in the
Stagestartup contract withInputStreamandOutputStreamtypes, so ownership is visible in the value passed to each stage. The stream ownership docs now spell out when stages must close closing streams, includingStart()error paths, and the startup cleanup code has been aligned so the pipeline only closes streams it still owns.WithStdoutCloseris still closed when startup fails before the final stage receives it.It also documents v2 early-close producer behavior. A downstream stage can stop reading before all input is consumed, causing upstream writes to surface
EPIPE,SIGPIPE, orio.ErrClosedPipe; README and public godoc now point callers atIgnoreError(stage, IsPipeError)for stateless producers and call out the extra consistency work required for stateful producers.