DiskDataProviders

Build Status codecov

DiskDataProviders

This package implements datastructures that are iterable and backed by a buffer that is fed by data from disk. If Reading and preproccesing data is faster than one training step, it's recommended to use a ChannelDiskDataProvider, if the training step is fast but reading data takes long time, QueueDiskDataProvider is recommended. Both types do the reading on a separate thread, so make sure Julia is started with at least two threads.

My personal use case for this package is training convolutional DL models using Flux. This package does not take care of the transfer of data to the GPU, as I have not managed to do this on a separate thread.

Supervised vs unsupervised

If the task is supervised, you may supply labels using the keyword labels, see example below. If the dataset has labels, it iterates tuples (x,y). If no labels are supplied, it iterates only inputs x. To create an unsupervised dataset with no labels, use Nothing as the label type, e.g. DiskDataProvider{xType, Nothing}.

Usage example

using DiskDataProviders, Test, Serialization

# === Create some random example data ===
dirpath = mktempdir()*"/"
N = 100
T = 500
batch_size = 2
queue_length = 5 # Length of the internal buffer, it's a good idea to make this be some integer multiple of the batch size.
labs = rand(1:5, N)
for i = 1:N
    a = randn(T)
    serialize(dirpath*"$(i).bin", (a, labs[i]))
end

files = dirpath .* string.(1:N) .* ".bin"

# === Create a DiskDataProvider ===
dataset = ChannelDiskDataProvider{Vector{Float64}, Int}((T,), batch_size, queue_length; labels=labs, files=files)
ChannelDiskDataProvider{Array{Float64,1},Int64,Array{Float32,2}}, length: 100

The dataset is iterable and can be used in loops etc. One can also create a batchview, which is an iterator over batches. The batch size is defined when the DiskDataProvider is created.

julia> # === Example usage of the provider ===
       datasett, datasetv = stratifiedobs(dataset, 0.75)
(ChannelDiskDataProvider{Array{Float64,1},Int64,Array{Float32,2}}, length: 75
, ChannelDiskDataProvider{Array{Float64,1},Int64,Array{Float32,2}}, length: 25
)

julia> sort(dataset.ulabels) == 1:5
true

julia> x,y = first(dataset) # Get one datapoint
([-1.763292377316459, -0.6108635948021169, -0.2858491008448278, -0.7239703958933539, 0.5882474885188901, 2.327744602371069, 0.7657358565623774, -0.6872386715259935, -0.5425005085049398, 0.07465848788446108  …  0.3347707856384763, -1.1135298682516108, -0.6444191161510273, 0.7768769999675971, 0.08517831417648923, 0.20035131766937353, -0.1858360867617615, -1.421256817272106, -0.533739341128714, -0.3965527454657803], 1)

julia> t = start_reading(dataset) # this function initiates the reading into the buffer
[ Info: Populating queue continuosly. Call `stop!(d)` to stop reading`. Call `wait(d)` to be notified when the queue is fully populated.
Task (runnable) @0x00007fd6111424a0

julia> wait(dataset) # Wait for the reading to start before proceeding

julia> bw = batchview(dataset)
DiskDataProviders.var"##409"(0x00, #undef, ChannelDiskDataProvider{Array{Float64,1},Int64,Array{Float32,2}}, length: 100
, #undef, #undef, 2, #undef, #undef)

julia> xb,yb = first(bw) # Get the first batch from the buffer
(Float32[1.001593 -1.7632924; -0.09906414 -0.61086357; … ; 0.83525735 -0.5337393; -1.1720904 -0.39655274], [1, 4])

julia> for (x,y) in bw # Iterate the batches in the batchview
           # do something with the data
       end

julia> stop!(dataset) # Stop reading into the buffer
false

If your data has more dimensions than 1, e.g., inputs are matrices or 3d-tensors, you create a DiskDataProvider like this

dataset = ChannelDiskDataProvider((nrows,ncols,nchannels), batchsize, queuelength; labels=labs, files=files)

notice that you have to provide nchannels, which is 1 if the input is a matrix.

Preprocess data

All functionality in this package operates on serialized, preprocessed data files. Serialized files are fast to read, and storing already preprocessed data cuts down on overhead. This package does currently not support arbitrary file formats. The files are read using Julias built in deserializer.

Iterators

Typically, you want to use batchview for training. If you have a small enough dataset (e.g. for validation), you may want to use full_batch, especially if this fits into the GPU memory. Batches are structured according to Flux's notion of a batch, e.g., the last dimension is the batch dimension.

Methods manipulating datasets

Base.getindexMethod.
Base.getindex(d::AbstractDiskDataProvider, inds::AbstractArray)

Get a dataset corresponding to a subset of the file indices

source
Base.splitMethod.
Base.split(d::AbstractDiskDataProvider,i1,i2)

Split the dataset into two parts defined by vectors of indices

source
Random.shuffleMethod.
Random.shuffle(d::AbstractDiskDataProvider)

Return a new dataset with the file order shuffled

source
Random.shuffle!Method.
Random.shuffle!(d::AbstractDiskDataProvider)

Shuffle the file order in place.

source

Exported functions and types

Index

ChannelDiskDataProvider(d::ChannelDiskDataProvider, inds::AbstractArray)

This constructor can be used to create a dataprovider that is a subset of another.

source
ChannelDiskDataProvider{XT, YT}(xsize, batchsize, queuelength::Int; kwargs...) where {XT, YT}

Constructor for ChannelDiskDataProvider. {XT, YT} are the types of the input and output respectively.

#Arguments:

  • xsize: Tuple with size of each data point
  • batchsize: how many datapoints to put in a batch
  • queuelength: length of buffer, it's a good idea to make this be some integer multiple of the batch size.
  • kwargs: to set the other fields of the structure.
  • transform : A Function (x,y)->(x,y) or x->x that transforms the data point before it is put in a batch. This can be used to, e.g., apply some pre processing or normalization etc.
source
QueueDiskDataProvider(d::QueueDiskDataProvider, inds::AbstractArray)

This constructor can be used to create a dataprovider that is a subset of another.

source
QueueDiskDataProvider{XT, YT}(xsize, batchsize, queuelength::Int; kwargs...) where {XT, YT}

Constructor for QueueDiskDataProvider.

{XT, YT} are the types of the input and output respectively.

#Arguments:

  • xsize: Tuple with size of each data point
  • batchsize: how many datapoints to put in a batch
  • queuelength: length of buffer, it's a good idea to make this be some integer multiple of the batch size.
  • kwargs: to set the other fields of the structure.
  • transform : A Function (x,y)->(x,y) or x->x that transforms the data point before it is put in a batch. This can be used to, e.g., apply some pre processing or normalization etc.
source
labels(d)

Return numeric labes in the dataset, i.e., strings are converted to integers etc.

source
sample_input(d::AbstractDiskDataProvider, y)

Sample one input with label y from the dataset

source
sample_input(d::AbstractDiskDataProvider)

Sample one datapoint from the dataset

source
sample_label(d)

Sample a random label from the dataset

source
start_reading(d::AbstractDiskDataProvider)

Initialize reading into the buffer. This function has to be called before the dataset is used. Reading will continue until you call stop! on the dataset. If the dataset is a ChannelDiskDataProvider, this is a non-issue.

source
batchview(d::AbstractDiskDataProvider, size=d.batchsize; kwargs...)

Create a batch iterator that iterates batches with the batch size defined at the creation of the DiskDataProvider.

source
buffered(d::AbstractDiskDataProvider)

Creates an iterator which uses the underlying buffer in the dataset.

source
full_batch(d::AbstractDiskDataProvider)

Returns a matrix with the entire dataset.

source
unbuffered(d::AbstractDiskDataProvider)

Creates an iterator which does not use the underlying buffer in the dataset.

source
unbuffered_batchview(d::AbstractDiskDataProvider, size=d.batchsize)

Iterate unbuffered batches. See also batchview

source
stratifiedobs(d::AbstractDiskDataProvider, p::AbstractFloat, args...; kwargs...)

Partition the data into multiple disjoint subsets proportional to the value(s) of p. The observations are assignmed to a data subset using stratified sampling without replacement. These subsets are then returned as a Tuple of subsets, where the first element contains the fraction of observations of data that is specified by the first float in p.

For example, if p is a Float64 itself, then the return-value will be a tuple with two datasets (i.e. subsets), in which the first element contains the fraction of observations specified by p and the second element contains the rest. In the following code the first subset train will contain around 70% of the observations and the second subset test the rest.

train, test = stratifiedobs(diskdataprovider, 0.7)

source