DiskDataProviders
This package implements datastructures that are iterable and backed by a buffer that is fed by data from disk. If Reading and preproccesing data is faster than one training step, it's recommended to use a ChannelDiskDataProvider
, if the training step is fast but reading data takes long time, QueueDiskDataProvider
is recommended. Both types do the reading on a separate thread, so make sure Julia is started with at least two threads.
My personal use case for this package is training convolutional DL models using Flux. This package does not take care of the transfer of data to the GPU, as I have not managed to do this on a separate thread.
Supervised vs unsupervised
If the task is supervised, you may supply labels using the keyword labels
, see example below. If the dataset has labels, it iterates tuples (x,y)
. If no labels are supplied, it iterates only inputs x
. To create an unsupervised dataset with no labels, use Nothing
as the label type, e.g. DiskDataProvider{xType, Nothing}
.
Usage example
using DiskDataProviders, Test, Serialization
# === Create some random example data ===
dirpath = mktempdir()*"/"
N = 100
T = 500
batch_size = 2
queue_length = 5 # Length of the internal buffer, it's a good idea to make this be some integer multiple of the batch size.
labs = rand(1:5, N)
for i = 1:N
a = randn(T)
serialize(dirpath*"$(i).bin", (a, labs[i]))
end
files = dirpath .* string.(1:N) .* ".bin"
# === Create a DiskDataProvider ===
dataset = ChannelDiskDataProvider{Vector{Float64}, Int}((T,), batch_size, queue_length; labels=labs, files=files)
ChannelDiskDataProvider{Array{Float64,1},Int64,Array{Float32,2}}, length: 100
The dataset is iterable and can be used in loops etc. One can also create a batchview
, which is an iterator over batches. The batch size is defined when the DiskDataProvider is created.
julia> # === Example usage of the provider ===
datasett, datasetv = stratifiedobs(dataset, 0.75)
(ChannelDiskDataProvider{Array{Float64,1},Int64,Array{Float32,2}}, length: 75
, ChannelDiskDataProvider{Array{Float64,1},Int64,Array{Float32,2}}, length: 25
)
julia> sort(dataset.ulabels) == 1:5
true
julia> x,y = first(dataset) # Get one datapoint
([-1.763292377316459, -0.6108635948021169, -0.2858491008448278, -0.7239703958933539, 0.5882474885188901, 2.327744602371069, 0.7657358565623774, -0.6872386715259935, -0.5425005085049398, 0.07465848788446108 … 0.3347707856384763, -1.1135298682516108, -0.6444191161510273, 0.7768769999675971, 0.08517831417648923, 0.20035131766937353, -0.1858360867617615, -1.421256817272106, -0.533739341128714, -0.3965527454657803], 1)
julia> t = start_reading(dataset) # this function initiates the reading into the buffer
[ Info: Populating queue continuosly. Call `stop!(d)` to stop reading`. Call `wait(d)` to be notified when the queue is fully populated.
Task (runnable) @0x00007fd6111424a0
julia> wait(dataset) # Wait for the reading to start before proceeding
julia> bw = batchview(dataset)
DiskDataProviders.var"##409"(0x00, #undef, ChannelDiskDataProvider{Array{Float64,1},Int64,Array{Float32,2}}, length: 100
, #undef, #undef, 2, #undef, #undef)
julia> xb,yb = first(bw) # Get the first batch from the buffer
(Float32[1.001593 -1.7632924; -0.09906414 -0.61086357; … ; 0.83525735 -0.5337393; -1.1720904 -0.39655274], [1, 4])
julia> for (x,y) in bw # Iterate the batches in the batchview
# do something with the data
end
julia> stop!(dataset) # Stop reading into the buffer
false
If your data has more dimensions than 1, e.g., inputs are matrices or 3d-tensors, you create a DiskDataProvider like this
dataset = ChannelDiskDataProvider((nrows,ncols,nchannels), batchsize, queuelength; labels=labs, files=files)
notice that you have to provide nchannels
, which is 1
if the input is a matrix.
Preprocess data
All functionality in this package operates on serialized, preprocessed data files. Serialized files are fast to read, and storing already preprocessed data cuts down on overhead. This package does currently not support arbitrary file formats. The files are read using Julias built in deserializer.
Iterators
- If you simply iterate over an
AbstractDiskDataProvider
, you will iterate over each datapoint in the sequence determined by the vector of file paths. This iteration is buffered by a buffer unique to the iterator. batchview
creates a buffered iterator over batches.unbuffered
an iterator that is not buffered.buffered
iterates over single datapoints from the buffer.full_batch
creates one enormous batch of the entire dataset.unbuffered_batchview
Iterates over batches, unbuffered.- For unsupervised datasets (without labels), the buffers are populated by randomly permuting the data files (shuffling). Using the default file iterator, all datapoints are visited in the same order in each epoch.
- For supervised datasets, unique labels are cycled through and a datapoint with that label is drawn uniformly at random.
Typically, you want to use batchview
for training. If you have a small enough dataset (e.g. for validation), you may want to use full_batch
, especially if this fits into the GPU memory. Batches are structured according to Flux's notion of a batch, e.g., the last dimension is the batch dimension.
Methods manipulating datasets
Base.getindex
— Method.Base.getindex(d::AbstractDiskDataProvider, inds::AbstractArray)
Get a dataset corresponding to a subset of the file indices
Base.split
— Method.Base.split(d::AbstractDiskDataProvider,i1,i2)
Split the dataset into two parts defined by vectors of indices
Random.shuffle
— Method.Random.shuffle(d::AbstractDiskDataProvider)
Return a new dataset with the file order shuffled
Random.shuffle!
— Method.Random.shuffle!(d::AbstractDiskDataProvider)
Shuffle the file order in place.
Exported functions and types
Index
DiskDataProviders.ChannelDiskDataProvider
DiskDataProviders.ChannelDiskDataProvider
DiskDataProviders.QueueDiskDataProvider
DiskDataProviders.QueueDiskDataProvider
Base.getindex
Base.split
DiskDataProviders.batchview
DiskDataProviders.buffered
DiskDataProviders.full_batch
DiskDataProviders.labels
DiskDataProviders.sample_input
DiskDataProviders.sample_input
DiskDataProviders.sample_label
DiskDataProviders.start_reading
DiskDataProviders.unbuffered
DiskDataProviders.unbuffered_batchview
MLDataPattern.stratifiedobs
Random.shuffle
Random.shuffle!
ChannelDiskDataProvider(d::ChannelDiskDataProvider, inds::AbstractArray)
This constructor can be used to create a dataprovider that is a subset of another.
ChannelDiskDataProvider{XT, YT}(xsize, batchsize, queuelength::Int; kwargs...) where {XT, YT}
Constructor for ChannelDiskDataProvider. {XT, YT}
are the types of the input and output respectively.
#Arguments:
xsize
: Tuple with size of each data pointbatchsize
: how many datapoints to put in a batchqueuelength
: length of buffer, it's a good idea to make this be some integer multiple of the batch size.kwargs
: to set the other fields of the structure.transform
: A Function(x,y)->(x,y)
orx->x
that transforms the data point before it is put in a batch. This can be used to, e.g., apply some pre processing or normalization etc.
DiskDataProviders.QueueDiskDataProvider
— Method.QueueDiskDataProvider(d::QueueDiskDataProvider, inds::AbstractArray)
This constructor can be used to create a dataprovider that is a subset of another.
DiskDataProviders.QueueDiskDataProvider
— Method.QueueDiskDataProvider{XT, YT}(xsize, batchsize, queuelength::Int; kwargs...) where {XT, YT}
Constructor for QueueDiskDataProvider.
{XT, YT}
are the types of the input and output respectively.
#Arguments:
xsize
: Tuple with size of each data pointbatchsize
: how many datapoints to put in a batchqueuelength
: length of buffer, it's a good idea to make this be some integer multiple of the batch size.kwargs
: to set the other fields of the structure.transform
: A Function(x,y)->(x,y)
orx->x
that transforms the data point before it is put in a batch. This can be used to, e.g., apply some pre processing or normalization etc.
DiskDataProviders.labels
— Method.labels(d)
Return numeric labes in the dataset, i.e., strings are converted to integers etc.
DiskDataProviders.sample_input
— Method.sample_input(d::AbstractDiskDataProvider, y)
Sample one input with label y
from the dataset
DiskDataProviders.sample_input
— Method.sample_input(d::AbstractDiskDataProvider)
Sample one datapoint from the dataset
DiskDataProviders.sample_label
— Method.sample_label(d)
Sample a random label from the dataset
DiskDataProviders.start_reading
— Method.start_reading(d::AbstractDiskDataProvider)
Initialize reading into the buffer. This function has to be called before the dataset is used. Reading will continue until you call stop!
on the dataset. If the dataset is a ChannelDiskDataProvider
, this is a non-issue.
DiskDataProviders.batchview
— Method.batchview(d::AbstractDiskDataProvider, size=d.batchsize; kwargs...)
Create a batch iterator that iterates batches with the batch size defined at the creation of the DiskDataProvider.
DiskDataProviders.buffered
— Method.buffered(d::AbstractDiskDataProvider)
Creates an iterator which uses the underlying buffer in the dataset.
DiskDataProviders.full_batch
— Method.full_batch(d::AbstractDiskDataProvider)
Returns a matrix with the entire dataset.
DiskDataProviders.unbuffered
— Method.unbuffered(d::AbstractDiskDataProvider)
Creates an iterator which does not use the underlying buffer in the dataset.
DiskDataProviders.unbuffered_batchview
— Function.unbuffered_batchview(d::AbstractDiskDataProvider, size=d.batchsize)
Iterate unbuffered batches. See also batchview
MLDataPattern.stratifiedobs
— Method.stratifiedobs(d::AbstractDiskDataProvider, p::AbstractFloat, args...; kwargs...)
Partition the data into multiple disjoint subsets proportional to the value(s) of p. The observations are assignmed to a data subset using stratified sampling without replacement. These subsets are then returned as a Tuple of subsets, where the first element contains the fraction of observations of data that is specified by the first float in p.
For example, if p is a Float64 itself, then the return-value will be a tuple with two datasets (i.e. subsets), in which the first element contains the fraction of observations specified by p and the second element contains the rest. In the following code the first subset train will contain around 70% of the observations and the second subset test the rest.
train, test = stratifiedobs(diskdataprovider, 0.7)