FIFO signaling producer that consumer has drained all bytes in pipe
Jason Mehrens
jason_mehrens at hotmail.com
Fri Aug 28 23:50:49 UTC 2020
Hello nio-dev,
Working on a program where a Java NIO producer of bytes is sending data through a FIFO (created with 'mkfifo' command) to an external process which is the consumer.
The producer needs a signal that the consumer has completed consuming the bytes so it can be determined that the FIFO can be deleted by the producer. I've included test case below to demonstrate the concept using cat as the consumer. So far I think the test demonstrates viability but it has raised a few questions and it would be awesome to get some insights.
1. FileInputStream::available is the only method that seems to determine the number of bytes in the FIFO. Is opening with SYNC redundant or necessary to ensure all parties communicate state correctly?
2. FileChannel::size always returns zero. It would be much nicer if FileChannel::size returned bytes available. Seems like returning zero violates SeekableByteChannel and FileChannel specs.
3. RandomAccessFile::length fails with java.io.IOException: Illegal seek. Spec seems to indicate that length should return length of the file. Files(Path)::size, File.length and 'ls' all return zero. The spec doesn't say anything on being able to seek to determine the length.
4. Is there any planned JDK support for FIFOs? Currently, there is no way to create a FIFO from Java without resorting to calling an external process, JNI, etc.
5. Any insights on the demo program? What am I missing?
Thanks,
Jason
====
import static java.nio.file.StandardOpenOption.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
public class FifoTest {
public static void main(String[] args) throws Exception {
/*
/tmp/FifoTest2773579150668516034.fifo
java.io.File::length 0
java.nio.file.Files::size 0
java.io.FileInputStream::available 17112
RandomAccessFile at sun.nio.ch.FileChannelImpl::size 0
RandomAccessFile at java.io.FileInputStream::available 17112
java.io.IOException: Illegal seek
at java.io.RandomAccessFile.length(Native Method)
at FifoTest.randomAccessFileLength(FifoTest.java:98)
at FifoTest.main(FifoTest.java:62)
sun.nio.ch.FileChannelImpl::size 0
*/
Path fifo = Files.createTempFile(FifoTest.class.getName(), ".fifo");
Files.delete(fifo);
System.out.println(fifo);
mkfifo(fifo);
try {
Process cat = cat(fifo);
try {
new Thread(new Drain(cat.getErrorStream())).start();
new Thread(new Drain(cat.getInputStream())).start();
//Open for READ to ensure open doesn't block waiting for consumer.
//Not read from as that would steal from consumer.
try (FileChannel c = FileChannel.open(fifo, READ, WRITE, SYNC)) {
ByteBuffer b = ByteBuffer.wrap("Hello fifo!\n".getBytes(StandardCharsets.UTF_8));
for (int i = 0; i < 10000; i++) {
c.write(b);
b.flip();
}
//OK: output is 0. Matches output of 'ls' as zero bytes so that seems right.
System.out.println(File.class.getName() +"::length "+ fifo.toFile().length());
System.out.println(Files.class.getName() +"::size "+ Files.size(fifo));
//OK: Gets a byte count. That is nice.
inputStreamAvailable(fifo);
//???: Fails with Illegal seek. Seems not consistent with FileChannel.
randomAccessFileLength(fifo);
//??? Not sure if zero is the right choice as you can't check the bytes drained.
System.out.println(c.getClass().getName() +"::size " + c.size());
//Wait until consumer drains the FIFO.
//If the FIFO is drained then it is safe to delete the FIFO.
//Would be nice FileChannel::size returned the number of bytes.
try (FileInputStream in = new FileInputStream(fifo.toFile())) {
while (in.available() > 0) {
//A better signal will be used in production but this
//applies a backoff and checks for interruption.
if (cat.waitFor(5, TimeUnit.MILLISECONDS)) {
break;
}
}
}
}
consume(cat);
} finally {
cat.destroy();
}
} finally {
Files.delete(fifo);
}
}
private static void randomAccessFileLength(Path fifo) {
try (RandomAccessFile raf = new RandomAccessFile(fifo.toFile(), "rws")) {
try (FileChannel fc = raf.getChannel()) {
System.out.println(raf.getClass().getSimpleName() +'@'+ fc.getClass().getName() + "::size " + fc.size());
try (InputStream in = new FileInputStream(raf.getFD())) {
System.out.println(raf.getClass().getSimpleName() +'@'+ in.getClass().getName() + "::available " + in.available());
System.out.println(raf.getClass().getName() + "::length " + raf.length());
}
}
} catch (IOException ioe) {
ioe.printStackTrace(System.out);
}
}
private static void inputStreamAvailable(Path fifo) {
try (FileInputStream in = new FileInputStream(fifo.toFile())) {
System.out.println(in.getClass().getName() + "::available " + in.available());
} catch (IOException ioe) {
ioe.printStackTrace(System.out);
}
}
private static Process cat(Path fifo) throws IOException, InterruptedException {
ProcessBuilder pb = new ProcessBuilder();
pb.command("cat", fifo.toString());
pb.redirectErrorStream();
return pb.start();
}
private static void consume(Process p) throws IOException, InterruptedException {
try (InputStream err = p.getErrorStream();
InputStream out = p.getInputStream()) {
byte[] b = new byte[1024];
for (int r; (r = out.read(b)) >= 0;) {
//System.out.write(b, 0, r);
}
for (int r; (r = err.read(b)) >= 0;) {
//System.err.write(b, 0, r);
}
int exit = p.waitFor();
if (exit != 0) {
throw new IOException(Integer.toString(exit));
}
} finally {
p.destroy();
}
}
private static Path mkfifo(Path fifo) throws IOException, InterruptedException {
ProcessBuilder pb = new ProcessBuilder();
pb.command("mkfifo", fifo.toString());
consume(pb.start());
return fifo;
}
private static class Drain implements Runnable {
private final InputStream in;
Drain(final InputStream in) {
this.in = in;
}
public void run() {
try {
byte[] b = new byte[1024];
for (int r; (r = in.read(b)) >= 0;) {
//System.out.write(b, 0, r);
}
} catch(IOException ioe) {
ioe.printStackTrace();
}
}
}
}
===
More information about the nio-dev
mailing list