Source code for opatio.load.load

import struct
from typing import List
import numpy as np

from opatio.base.header import Header
from opatio import OPAT

from opatio.catalog.entry import CardCatalogEntry
from opatio.card.datacard import DataCard
from opatio.card.datacard import CardHeader
from opatio.card.datacard import OPATTable
from opatio.card.datacard import CardIndexEntry

[docs] def read_opat(filename: str) -> OPAT: """ Load an OPAT file. Parameters ---------- filename : str The name of the file to load. Returns ------- OPAT The loaded OPAT object. Examples -------- >>> opat = read_opat("example.opat") >>> print(opat.header) Notes ----- This function reads the header, catalog, and data cards from the OPAT file. """ opat = OPAT() with open(filename, 'rb') as f: headerBytes: bytes = f.read(256) unpackedHeader = struct.unpack("<4s H I I Q 16s 64s 128s H B 23s", headerBytes) loadedHeader = Header( magic = unpackedHeader[0].decode().replace("\x00", ""), version = unpackedHeader[1], numCards = unpackedHeader[2], headerSize = unpackedHeader[3], catalogOffset = unpackedHeader[4], creationDate = unpackedHeader[5].decode().replace("\x00", ""), sourceInfo = unpackedHeader[6].decode().replace("\x00", ""), comment = unpackedHeader[7].decode().replace("\x00", ""), numIndex = unpackedHeader[8], hashPrecision = unpackedHeader[9], reserved = unpackedHeader[10] ) opat.header = loadedHeader f.seek(opat.header.catalogOffset) tableIndices: List[CardCatalogEntry] = [] tableIndexChunkSize = 16 + loadedHeader.numIndex*8 tableIndexFMTString = "<"+"d"*loadedHeader.numIndex+"QQ" while tableIndexEntryBytes := f.read(tableIndexChunkSize): unpackedTableIndexEntry = struct.unpack(tableIndexFMTString, tableIndexEntryBytes) checksum = f.read(32) index = unpackedTableIndexEntry[:loadedHeader.numIndex] tableIndexEntry = CardCatalogEntry( index = index, byteStart = unpackedTableIndexEntry[loadedHeader.numIndex], byteEnd = unpackedTableIndexEntry[loadedHeader.numIndex+1], sha256 = checksum ) tableIndices.append(tableIndexEntry) # Read the card tables for entry in tableIndices: startByte = entry.byteStart endByte = entry.byteEnd f.seek(startByte) cardBytes = f.read(endByte - startByte) card = load_data_card(cardBytes) opat.add_card(entry.index, card) return opat
[docs] def load_data_card(b: bytes) -> DataCard: """ Load a DataCard from bytes. Parameters ---------- b : bytes The bytes to load into a DataCard. Returns ------- DataCard The loaded DataCard object. Examples -------- >>> with open("example.datacard", "rb") as f: ... data = f.read() >>> card = loadDataCard(data) >>> print(card.header) Notes ----- This function parses the header and tables from the provided bytes. This is not intended to be used by the end user; rather, this is a utility function used by load_opat. """ newCard = DataCard() headerUnpacked = struct.unpack("<4s I I Q Q 128s 100s", b[:256]) header = CardHeader( numTables = 0, headerSize = headerUnpacked[2], indexOffset = headerUnpacked[3], cardSize = headerUnpacked[4], comment = headerUnpacked[5].decode().replace("\x00", ""), reserved= headerUnpacked[6] ) newCard.header = header.copy() for indexEntry in range(headerUnpacked[1]): startByte = header.indexOffset + indexEntry*64 indexBytes = b[startByte:startByte+64] unpackedIndexEntry = struct.unpack("<8s Q Q H H 8s 8s Q 12s", indexBytes) tableTag = unpackedIndexEntry[0].decode().replace("\x00", "") tableByteStart = unpackedIndexEntry[1] tableByteEnd = unpackedIndexEntry[2] tableNumColumns = unpackedIndexEntry[3] tableNumRows = unpackedIndexEntry[4] tableColumnName = unpackedIndexEntry[5].decode().replace("\x00", "") tableRowName = unpackedIndexEntry[6].decode().replace("\x00", "") tableCellVectorSize = unpackedIndexEntry[7] tableBytes = b[tableByteStart:tableByteEnd] rawData = struct.unpack(f"<{tableNumRows}d{tableNumColumns}d{tableNumRows*tableNumColumns*tableCellVectorSize}d", tableBytes) rowValues = np.array(rawData[:tableNumRows], dtype=np.float64) columnValues = np.array(rawData[tableNumRows:tableNumRows+tableNumColumns], dtype=np.float64) if tableCellVectorSize > 1: dataArray = np.array(rawData[tableNumRows+tableNumColumns:], dtype=np.float64).reshape((tableNumRows, tableNumColumns, tableCellVectorSize)) else: dataArray = np.array(rawData[tableNumRows+tableNumColumns:], dtype=np.float64).reshape((tableNumRows, tableNumColumns)) newTable = OPATTable(rowValues=rowValues, columnValues=columnValues, data=dataArray) newCard.add_table(tableTag, newTable, columnName=tableColumnName, rowName=tableRowName) return newCard