← Blog

Reading Named Pipes with Elixir

How to go about reading data from special files like named pipes

Aug 14, 2017
4 min read

TLDR

iex(1)> Port.open('myfile', [{:line, 4096}, :eof])
iex(1)> flush

This works but the answer is actually more complicated than this. Read on if you want the full explanation.

The Story

This past week I was migrating yet another company off of MongoDB. The database size was fairly large and I wanted an easy way to stream the data out progressively instead of downloading the entire dataset. I decided to use mongoexport and pipe the results out to a UNIX named pipe.

mkfifo mongo
mongoexport --db --collection --out mongo

Now I have a special file that I can read line by line and process the data. It is totally resumable in case my migration script crashes. Named pipes also block the writer unless there is a reader, thereby providing back pressure.

A Mysterious Error

Trying to read this file using File.stream! gave me a mysterious error

iex(1)> File.stream!("mongo") |> Enum.take(10)
** (File.Error) could not stream "mongo": illegal operation on a directory

Directory? But I’m trying to open a file. Let’s try simply opening the file

iex(1)> File.open('mongo')
{:error, :eisdir}

Strange, Elixir seems to think this named pipe is a directory. After some intense Googling I found that this error is a misnomer, it simply indicates that Erlang will not open special files through the standard IO api.

The Solution

The solution is ports! You can open up a named pipe directly as a port.

iex(1)> Port.open('mongo', [{:line, 4096}, :eof])
#Port<0.1232>
iex(2)> flush
{#Port<0.1232>, {:data, {:eol, ''}}}

Notice the single quotes, we are actually calling the Erlang function open_port here which expects an Erlang char list as input. Opening a port will immediately start reading the contents of the target and delivering them as messages to your current process.

I also pass in {:line, 4096} as an option which delivers an entire line as a single message with a buffer of 4096 bytes. The eof option indicates you would like to receive an additional message when the file is closed. You can read about all the options here.

Now you can write a simple receive loop that pulls messages out of the mailbox and processes the data. These messages can be piped through Stream.async_stream for concurrent processing (yay Elixir). However, there’s another problem with this solution.

No Backpressure

Since I was migrating a large amount of of data the process took a while to run. I noticed that memory use was slowly increasing over time. This made no sense to me as the whole point of using a named pipe was to limit the memory used to only the current entries being processed.

What I realized after some profiling is that the Port was reading from the pipe as fast as possible and loading those messages into the process mailbox. The process was not keeping up reading and processing the messages so the number of pending messages kept increasing.

Erlang ports cannot provide back pressure to the pipe since they handle delivery of the line as an Erlang message. Unlike Go channels, Erlang mailboxes do not block if there is no reader. After some further Googling I found that Erlang very intentionally does not support this as documented here

9.12 Why can’t I open devices (e.g. a serial port) like normal files?

Short answer: because the erlang runtime system was not designed to do that. The Erlang runtime system’s internal file access system, efile, must avoid blocking, otherwise the whole Erlang system will block. This is not a good thing in a soft real-time system. When accessing regular files, it’s generally a reasonable assumption that operations will not block. Devices, on the other hand, are quite likely to block. Some devices, such as serial ports, may block indefinitely. There are several possible ways to solve this. The Erlang runtime system could be altered, or an external port program could be used to access the device. Two mailing list discussions about the topic can be found here and here.

A Hack I Cannot Recommend

Since Elixir cannot internally provide back pressure, the only option was to provide it externally. I decided to use the head utility combined with Elixir’s Stream.resource to create a batched stream.

Stream.resource(
fn -> 0 end,
fn input ->
{result, 0} = System.cmd("head", ["-n", "1000", "mongo"])
splits = result |> String.split("\n")
IO.puts("Loaded #{@batch * input}")
{splits, input + 1}
end,
fn _ -> end
)
|> Stream.async_stream(&do_some_work/1, max_concurrency: 1000)

What this does is create a stream of lines in the pipe but only ever reads 1000 into memory at a time.

While this does work, it is clear at this point Elixir isn’t the right tool for the job. As much as I love Elixir I was able to rewrite my script in Go in about 30 min and had it worked perfectly. Great reminder to make sure you have a wide set of options in your toolkit so you can use the right one for the job.

Dax Raad
I build things then try to remember to write about them