Program Listing for File HDF5Output.cpp
↰ Return to documentation for file (src/module/HDF5Output.cpp
)
#ifdef CRPROPA_HAVE_HDF5
#include "crpropa/module/HDF5Output.h"
#include "crpropa/Version.h"
#include "crpropa/Random.h"
#include "kiss/logger.h"
#include <hdf5.h>
#include <cstring>
const hsize_t RANK = 1;
const hsize_t BUFFER_SIZE = 1024 * 16;
namespace crpropa {
// map variant types to H5T_NATIVE
hid_t variantTypeToH5T_NATIVE(Variant::Type type) {
if (type == Variant::TYPE_INT64)
return H5T_NATIVE_INT64;
else if(type == Variant::TYPE_BOOL)
return H5T_NATIVE_HBOOL;
else if(type == Variant::TYPE_CHAR)
return H5T_NATIVE_CHAR;
else if(type == Variant::TYPE_UCHAR)
return H5T_NATIVE_UCHAR;
else if(type == Variant::TYPE_INT16)
return H5T_NATIVE_INT16;
else if(type == Variant::TYPE_UINT16)
return H5T_NATIVE_UINT16;
else if(type == Variant::TYPE_INT32)
return H5T_NATIVE_INT32;
else if(type == Variant::TYPE_UINT32)
return H5T_NATIVE_UINT32;
else if(type == Variant::TYPE_INT64)
return H5T_NATIVE_INT64;
else if(type == Variant::TYPE_UINT64)
return H5T_NATIVE_UINT64;
else if(type == Variant::TYPE_FLOAT)
return H5T_NATIVE_FLOAT;
else if(type == Variant::TYPE_DOUBLE)
return H5T_NATIVE_DOUBLE;
else if(type == Variant::TYPE_STRING)
return H5T_C_S1;
else
{
KISS_LOG_ERROR << "variantTypeToH5T_NATIVE:: Type: " << Variant::getTypeName(type) << " unknown.";
throw std::runtime_error("No matching HDF type for Variant type");
}
}
HDF5Output::HDF5Output() : Output(), filename(), file(-1), sid(-1), dset(-1), dataspace(-1), candidatesSinceFlush(0), flushLimit(std::numeric_limits<unsigned int>::max()) {
}
HDF5Output::HDF5Output(const std::string& filename) : Output(), filename(filename), file(-1), sid(-1), dset(-1), dataspace(-1), candidatesSinceFlush(0), flushLimit(std::numeric_limits<unsigned int>::max()) {
}
HDF5Output::HDF5Output(const std::string& filename, OutputType outputtype) : Output(outputtype), filename(filename), file(-1), sid(-1), dset(-1), dataspace(-1), candidatesSinceFlush(0), flushLimit(std::numeric_limits<unsigned int>::max()) {
outputtype = outputtype;
}
HDF5Output::~HDF5Output() {
close();
}
herr_t HDF5Output::insertStringAttribute(const std::string &key, const std::string &value){
hid_t strtype, attr_space, version_attr;
hsize_t dims = 0;
herr_t status;
strtype = H5Tcopy(H5T_C_S1);
status = H5Tset_size(strtype, value.size());
attr_space = H5Screate_simple(0, &dims, NULL);
version_attr = H5Acreate2(dset, key.c_str(), strtype, attr_space, H5P_DEFAULT, H5P_DEFAULT);
status = H5Awrite(version_attr, strtype, value.c_str());
status = H5Aclose(version_attr);
status = H5Sclose(attr_space);
return status;
}
herr_t HDF5Output::insertDoubleAttribute(const std::string &key, const double &value){
hid_t type, attr_space, version_attr;
hsize_t dims = 0;
herr_t status;
type = H5Tcopy(H5T_NATIVE_DOUBLE);
attr_space = H5Screate_simple(0, &dims, NULL);
version_attr = H5Acreate2(dset, key.c_str(), type, attr_space, H5P_DEFAULT, H5P_DEFAULT);
status = H5Awrite(version_attr, type, &value);
status = H5Aclose(version_attr);
status = H5Sclose(attr_space);
return status;
}
void HDF5Output::open(const std::string& filename) {
file = H5Fcreate(filename.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
if (file < 0)
throw std::runtime_error(std::string("Cannot create file: ") + filename);
sid = H5Tcreate(H5T_COMPOUND, sizeof(OutputRow));
if (fields.test(TrajectoryLengthColumn))
H5Tinsert(sid, "D", HOFFSET(OutputRow, D), H5T_NATIVE_DOUBLE);
if (fields.test(RedshiftColumn))
H5Tinsert(sid, "z", HOFFSET(OutputRow, z), H5T_NATIVE_DOUBLE);
if (fields.test(SerialNumberColumn))
H5Tinsert(sid, "SN", HOFFSET(OutputRow, SN), H5T_NATIVE_UINT64);
if (fields.test(CurrentIdColumn))
H5Tinsert(sid, "ID", HOFFSET(OutputRow, ID), H5T_NATIVE_INT32);
if (fields.test(CurrentEnergyColumn))
H5Tinsert(sid, "E", HOFFSET(OutputRow, E), H5T_NATIVE_DOUBLE);
if (fields.test(CurrentPositionColumn) && oneDimensional)
H5Tinsert(sid, "X", HOFFSET(OutputRow, X), H5T_NATIVE_DOUBLE);
if (fields.test(CurrentPositionColumn) && not oneDimensional) {
H5Tinsert(sid, "X", HOFFSET(OutputRow, X), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "Y", HOFFSET(OutputRow, Y), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "Z", HOFFSET(OutputRow, Z), H5T_NATIVE_DOUBLE);
}
if (fields.test(CurrentDirectionColumn) && not oneDimensional) {
H5Tinsert(sid, "Px", HOFFSET(OutputRow, Px), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "Py", HOFFSET(OutputRow, Py), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "Pz", HOFFSET(OutputRow, Pz), H5T_NATIVE_DOUBLE);
}
if (fields.test(SerialNumberColumn))
H5Tinsert(sid, "SN0", HOFFSET(OutputRow, SN0), H5T_NATIVE_UINT64);
if (fields.test(SourceIdColumn))
H5Tinsert(sid, "ID0", HOFFSET(OutputRow, ID0), H5T_NATIVE_INT32);
if (fields.test(SourceEnergyColumn))
H5Tinsert(sid, "E0", HOFFSET(OutputRow, E0), H5T_NATIVE_DOUBLE);
if (fields.test(SourcePositionColumn) && oneDimensional)
H5Tinsert(sid, "X0", HOFFSET(OutputRow, X0), H5T_NATIVE_DOUBLE);
if (fields.test(SourcePositionColumn) && not oneDimensional){
H5Tinsert(sid, "X0", HOFFSET(OutputRow, X0), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "Y0", HOFFSET(OutputRow, Y0), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "Z0", HOFFSET(OutputRow, Z0), H5T_NATIVE_DOUBLE);
}
if (fields.test(SourceDirectionColumn) && not oneDimensional) {
H5Tinsert(sid, "P0x", HOFFSET(OutputRow, P0x), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "P0y", HOFFSET(OutputRow, P0y), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "P0z", HOFFSET(OutputRow, P0z), H5T_NATIVE_DOUBLE);
}
if (fields.test(SerialNumberColumn))
H5Tinsert(sid, "SN1", HOFFSET(OutputRow, SN1), H5T_NATIVE_UINT64);
if (fields.test(CreatedIdColumn))
H5Tinsert(sid, "ID1", HOFFSET(OutputRow, ID1), H5T_NATIVE_INT32);
if (fields.test(CreatedEnergyColumn))
H5Tinsert(sid, "E1", HOFFSET(OutputRow, E1), H5T_NATIVE_DOUBLE);
if (fields.test(CreatedPositionColumn) && oneDimensional)
H5Tinsert(sid, "X1", HOFFSET(OutputRow, X1), H5T_NATIVE_DOUBLE);
if (fields.test(CreatedPositionColumn) && not oneDimensional) {
H5Tinsert(sid, "X1", HOFFSET(OutputRow, X1), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "Y1", HOFFSET(OutputRow, Y1), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "Z1", HOFFSET(OutputRow, Z1), H5T_NATIVE_DOUBLE);
}
if (fields.test(CreatedDirectionColumn) && not oneDimensional) {
H5Tinsert(sid, "P1x", HOFFSET(OutputRow, P1x), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "P1y", HOFFSET(OutputRow, P1y), H5T_NATIVE_DOUBLE);
H5Tinsert(sid, "P1z", HOFFSET(OutputRow, P1z), H5T_NATIVE_DOUBLE);
}
if (fields.test(WeightColumn))
H5Tinsert(sid, "W", HOFFSET(OutputRow, weight), H5T_NATIVE_DOUBLE);
if (fields.test(CandidateTagColumn))
H5Tinsert(sid, "tag", HOFFSET(OutputRow, tag), H5T_C_S1);
size_t pos = 0;
for(std::vector<Output::Property>::const_iterator iter = properties.begin();
iter != properties.end(); ++iter)
{
hid_t type = variantTypeToH5T_NATIVE((*iter).defaultValue.getType());
if (type == H5T_C_S1)
{ // set size of string field to size of default value!
type = H5Tcopy(H5T_C_S1);
H5Tset_size(type, (*iter).defaultValue.toString().size());
}
H5Tinsert(sid, (*iter).name.c_str(), HOFFSET(OutputRow, propertyBuffer) + pos, type);
pos += (*iter).defaultValue.getSize();
}
if (pos >= propertyBufferSize)
{
KISS_LOG_ERROR << "Using " << pos << " bytes for properties output. Maximum is " << propertyBufferSize << " bytes.";
throw std::runtime_error("Size of property buffer exceeded");
}
// chunked prop
hid_t plist = H5Pcreate(H5P_DATASET_CREATE);
H5Pset_layout(plist, H5D_CHUNKED);
hsize_t chunk_dims[RANK] = {BUFFER_SIZE};
H5Pset_chunk(plist, RANK, chunk_dims);
H5Pset_deflate(plist, 5);
hsize_t dims[RANK] = {0};
hsize_t max_dims[RANK] = {H5S_UNLIMITED};
dataspace = H5Screate_simple(RANK, dims, max_dims);
dset = H5Dcreate2(file, "CRPROPA3", sid, dataspace, H5P_DEFAULT, plist, H5P_DEFAULT);
insertStringAttribute("OutputType", outputName);
insertStringAttribute("Version", g_GIT_DESC);
insertDoubleAttribute("LengthScale", this->lengthScale);
insertDoubleAttribute("EnergyScale", this->energyScale);
// add ranom seeds
std::vector< std::vector<uint32_t> > seeds = Random::getSeedThreads();
for (size_t i = 0; i < seeds.size(); i++)
{
hid_t type, attr_space, version_attr;
herr_t status;
hsize_t dims[] = {1, 0};
dims[1] = seeds[i].size();
type = H5Tarray_create(H5T_NATIVE_ULONG, 2, dims);
attr_space = H5Screate_simple(0, dims, NULL);
char nameBuffer[256];
sprintf(nameBuffer, "SEED_%03lu", i);
KISS_LOG_DEBUG << "Creating HDF5 attribute: " << nameBuffer << " with dimensions " << dims[0] << "x" << dims[1] ;
version_attr = H5Acreate2(dset, nameBuffer, type, attr_space, H5P_DEFAULT, H5P_DEFAULT);
status = H5Awrite(version_attr, type, &seeds[i][0]);
status = H5Aclose(version_attr);
status = H5Sclose(attr_space);
}
H5Pclose(plist);
buffer.reserve(BUFFER_SIZE);
time(&lastFlush);
}
void HDF5Output::close() {
if (file >= 0) {
flush();
H5Dclose(dset);
H5Tclose(sid);
H5Sclose(dataspace);
H5Fclose(file);
file = -1;
}
}
void HDF5Output::process(Candidate* candidate) const {
#pragma omp critical
{
if (file == -1)
// This is ugly, but necesary as otherwise the user has to manually open the
// file before processing the first candidate
const_cast<HDF5Output*>(this)->open(filename);
}
OutputRow r;
r.D = candidate->getTrajectoryLength() / lengthScale;
r.z = candidate->getRedshift();
r.SN = candidate->getSerialNumber();
r.ID = candidate->current.getId();
r.E = candidate->current.getEnergy() / energyScale;
Vector3d v = candidate->current.getPosition() / lengthScale;
r.X = v.x;
r.Y = v.y;
r.Z = v.z;
v = candidate->current.getDirection();
r.Px = v.x;
r.Py = v.y;
r.Pz = v.z;
r.SN0 = candidate->getSourceSerialNumber();
r.ID0 = candidate->source.getId();
r.E0 = candidate->source.getEnergy() / energyScale;
v = candidate->source.getPosition() / lengthScale;
r.X0 = v.x;
r.Y0 = v.y;
r.Z0 = v.z;
v = candidate->source.getDirection();
r.P0x = v.x;
r.P0y = v.y;
r.P0z = v.z;
r.SN1 = candidate->getCreatedSerialNumber();
r.ID1 = candidate->created.getId();
r.E1 = candidate->created.getEnergy() / energyScale;
v = candidate->created.getPosition() / lengthScale;
r.X1 = v.x;
r.Y1 = v.y;
r.Z1 = v.z;
v = candidate->created.getDirection();
r.P1x = v.x;
r.P1y = v.y;
r.P1z = v.z;
r.weight= candidate->getWeight();
r.tag = candidate->getTagOrigin();
size_t pos = 0;
for(std::vector<Output::Property>::const_iterator iter = properties.begin();
iter != properties.end(); ++iter)
{
Variant v;
if (candidate->hasProperty((*iter).name))
{
v = candidate->getProperty((*iter).name);
}
else
{
v = (*iter).defaultValue;
}
pos += v.copyToBuffer(&r.propertyBuffer[pos]);
}
#pragma omp critical
{
const_cast<HDF5Output*>(this)->candidatesSinceFlush++;
Output::process(candidate);
buffer.push_back(r);
if (buffer.size() >= buffer.capacity())
{
KISS_LOG_DEBUG << "HDF5Output: Flush due to buffer capacity exceeded";
flush();
}
else if (candidatesSinceFlush >= flushLimit)
{
KISS_LOG_DEBUG << "HDF5Output: Flush due to number of candidates";
flush();
}
else if (difftime(time(NULL), lastFlush) > 60*10)
{
KISS_LOG_DEBUG << "HDF5Output: Flush due to time exceeded";
flush();
}
}
}
void HDF5Output::flush() const {
const_cast<HDF5Output*>(this)->lastFlush = time(NULL);
const_cast<HDF5Output*>(this)->candidatesSinceFlush = 0;
hsize_t n = buffer.size();
if (n == 0)
return;
hid_t file_space = H5Dget_space(dset);
hsize_t count = H5Sget_simple_extent_npoints(file_space);
// resize dataset
hsize_t new_size[RANK] = {count + n};
H5Dset_extent(dset, new_size);
// get updated filespace
H5Sclose(file_space);
file_space = H5Dget_space(dset);
hsize_t offset[RANK] = {count};
hsize_t cnt[RANK] = {n};
H5Sselect_hyperslab(file_space, H5S_SELECT_SET, offset, NULL, cnt, NULL);
hid_t mspace_id = H5Screate_simple(RANK, cnt, NULL);
H5Dwrite(dset, sid, mspace_id, file_space, H5P_DEFAULT, buffer.data());
H5Sclose(mspace_id);
H5Sclose(file_space);
buffer.clear();
H5Fflush(file, H5F_SCOPE_GLOBAL);
}
std::string HDF5Output::getDescription() const {
return "HDF5Output";
}
void HDF5Output::setFlushLimit(unsigned int N)
{
flushLimit = N;
}
} // namespace crpropa
#endif // CRPROPA_HAVE_HDF5