Class PlainPBStoragePlugin
- java.lang.Object
-
- edu.stanford.slac.archiverappliance.PlainPB.PlainPBStoragePlugin
-
- All Implemented Interfaces:
ETLDest
,ETLSource
,StorageMetrics
,Reader
,StoragePlugin
,Writer
public class PlainPBStoragePlugin extends Object implements StoragePlugin, ETLSource, ETLDest, StorageMetrics
The plain PB storage plugin stores data in a chunk per PV per partition in sequential form. No index is maintained, simple search algorithms are used to locate events. This plugin has these configuration parameters.- name
- This serves to identify this plugin; mandatory
- rootFolder
- This serves as the rootFolder that is prepended to the path generated for a PV+chunk ; mandatory.
One can use environment variables here; for example,
pb://localhost?name=STS&rootFolder=${ARCHAPPL_SHORT_TERM_FOLDER}&partitionGranularity=PARTITION_HOUR
where the value for ${ARCHAPPL_SHORT_TERM_FOLDER} is picked up from the environment/system properties. - partitionGranularity
- Defines the time partition granularity for this plugin. For example, if the granularity
PARTITION_HOUR
, then a new chunk is created for each hour of data. The partitions are clean; that is, they contain data only for that partition. It is possible to predict which chunk contains data for a particular instant in time and which chunks contain data for a particular time period. This is a mandatory field. - compress
- This is an optional field that defines the compression mode.
The support for zip compression is experimental.
If the zip compression is used, the
rootfolder
is prepended with
. If this is absent in theZIP_PREFIX
rootfolder
, the initialization code automatically adds it in. - hold & gather
hold
andgather
are optional fields that work together to implement high/low watermarks for data transfer. By default, bothhold
andgather
are 0 which leads to data being transferred out of this plugin as soon as the partition boundary is reached. You canhold
a certain number of partitions in this store (perhaps because this store is a high performing one). In this case, ETL does not start until the first event in this store is older thanhold
partitions. Once ETL begins, you can transfergather
partitions at a time. For example,hold=5&gather=3
lets you keep at least5-3=2
partitions in this store. ETL kicks in once the oldest event is older than than5
partitions and data is moved3
partitions at a time.- pp
- An optional parameter, this contains a list of
post processing operators
that are computed and cached during ETL. During retrieval, if an exact match is found, then the data from the cached copy is used (greatly improving retrieval performance). Otherwise, the post processor is applied and the data is computed at runtime. To specify multiple post processors, use standard URL syntax like sopp=rms&pp=mean_3600
- consolidateOnShutdown
- This lets you control if ETL should push data to the subsequent store on appserver shutdown. This is useful if you are using a RAMDisk for the short term store.
- reducedata
- An optional parameter; use this parameter to reduce the data as you move it into this store. You can use any of the post processors that can be used with the
pp
argument. For example, if you define the LTS aspb://localhost?name=LTS&rootFolder=${ARCHAPPL_LONG_TERM_FOLDER}&partitionGranularity=PARTITION_YEAR&reducedata=firstSample_3600
, then when moving data into this store, ETL will apply thefirstSample_3600
operator on the raw data to reduce the data and store only the reduced data. The difference between this parameter and thepp
parameter is that in thereducedata
case, only the reduced data is stored. The raw data is thrown away. If you specify both thepp
and thereducedata
, you may get unpredictable results because the raw data is necessary to precompute the caches. - etlIntoStoreIf
- An optional parameter; use this parameter to control if ETL should move data into this store.
If the named flag specified by this parameter is false, this plugin will behave like the blackhole plugin (and you will lose data).
Note that named flags are false by default; so the default behavior if you specify this flag and forget to the set the named flag is to lose data.
If you don't set this flag at all; then this plugin behaves normally and will accept all the ETL data coming in.
For example, if you add a
etlIntoStoreIf=testFlag
; then data will be moved into this store only if the value of the named flagtestFlag
is true. - etlOutofStoreIf
- An optional parameter; use this parameter to control if ETL should move data out of this store.
If the named flag specified by this parameter is false, this plugin will behave like a bag of holding and accumulate all the data it can.
Note that named flags are false by default; so the default behavior if you specify this flag and forget to the set the named flag is to collect data till you run out of space.
If you don't set this flag at all; then this plugin behaves normally and will move data out as before.
For example, if you add a
etlOutofStoreIf=testFlag
; then data will be moved ouf of this store only if the value of the named flagtestFlag
is true.
- Author:
- mshankar
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
PlainPBStoragePlugin.CompressionMode
Support for ZIP_PER_PV is still experimental.
-
Field Summary
Fields Modifier and Type Field Description static String
APPEND_EXTENSION
static String
PB_EXTENSION
-
Constructor Summary
Constructors Constructor Description PlainPBStoragePlugin()
-
Method Summary
Modifier and Type Method Description boolean
appendData(BasicContext context, String pvName, EventStream stream)
boolean
appendToETLAppendData(String pvName, EventStream stream, ETLContext context)
This appends an EventStream to the ETL append data for a PV.boolean
commitETLAppendData(String pvName, ETLContext context)
This concatenates the ETL append data for a PV with the PV's destination data.boolean
consolidateOnShutdown()
Should ETL move data from this source to the destination on shutdown.void
convert(BasicContext context, String pvName, ConversionFunction conversionFuntion)
Sometimes, PVs change types, EGUs etc.PlainPBStoragePlugin.CompressionMode
getCompressionMode()
List<Callable<EventStream>>
getDataForPV(BasicContext context, String pvName, Timestamp startTime, Timestamp endTime)
List<Callable<EventStream>>
getDataForPV(BasicContext context, String pvName, Timestamp startTime, Timestamp endTime, PostProcessor postProcessor)
String
getDesc()
String
getDescription()
Get a string description of this plugin; one that can potentially be used in log messages and provide context.List<ETLInfo>
getETLStreams(String pvName, Timestamp currentTime, ETLContext context)
Given a pv and a time, this method returns all the streams that are ready for ETL.Event
getFirstKnownEvent(BasicContext context, String pvName)
Get the first event for this PV.int
getGatherETLinPartitions()
int
getHoldETLForPartions()
The hold and gather are used to implement a high/low watermark for ETL.Event
getLastKnownEvent(BasicContext context, String pvName)
Gets the last known event in this destination.String
getName()
Multiple PVs will probably use the same storage area and we identify the area using the name.PartitionGranularity
getPartitionGranularity()
String
getRootFolder()
long
getTotalSpace(StorageMetricsContext storageMetricsContext)
Gets the total space left on this device.String
getURLRepresentation()
Return a URL representation of this plugin suitable for parsing by StoragePluginURLParserlong
getUsableSpace(StorageMetricsContext storageMetricsContext)
Gets the space available to this VM on this devicevoid
initialize(String configURL, ConfigService configService)
Each storage plugin is registered to a URI scheme; for example, the PlainStoragePBPlugin uses pb:// as the scheme.boolean
isBackupFilesBeforeETL()
void
markForDeletion(ETLInfo info, ETLContext context)
Delete the ETLStream identifier by info when you can as it has already been consumed by the ETL destination.boolean
prepareForNewPartition(String pvName, Event ev, ArchDBRTypes archDBRType, ETLContext context)
This informs the destination that we are switching to a new partition and this dest needs to execute its pre-processing for a new partition.void
renamePV(BasicContext context, String oldName, String newName)
Change the name of a PV.boolean
runPostProcessors(String pvName, ArchDBRTypes dbrtype, ETLContext context)
Run the post processors associated with this plugin if any for this pv.void
setBackupFilesBeforeETL(boolean backupFilesBeforeETL)
void
setDesc(String newDesc)
void
setGatherETLinPartitions(int gatherETLinPartitions)
void
setHoldETLForPartions(int holdETLForPartions)
void
setName(String name)
void
setPartitionGranularity(PartitionGranularity partitionGranularity)
void
setRootFolder(String rootFolder)
long
spaceConsumedByPV(String pvName)
Gets an estimate of the space consumed by this PV on this device.
-
-
-
Field Detail
-
PB_EXTENSION
public static final String PB_EXTENSION
- See Also:
- Constant Field Values
-
APPEND_EXTENSION
public static final String APPEND_EXTENSION
- See Also:
- Constant Field Values
-
-
Method Detail
-
getDataForPV
public List<Callable<EventStream>> getDataForPV(BasicContext context, String pvName, Timestamp startTime, Timestamp endTime) throws IOException
- Throws:
IOException
-
getDataForPV
public List<Callable<EventStream>> getDataForPV(BasicContext context, String pvName, Timestamp startTime, Timestamp endTime, PostProcessor postProcessor) throws IOException
- Specified by:
getDataForPV
in interfaceReader
- Throws:
IOException
-
appendData
public boolean appendData(BasicContext context, String pvName, EventStream stream) throws IOException
- Specified by:
appendData
in interfaceWriter
- Throws:
IOException
-
appendToETLAppendData
public boolean appendToETLAppendData(String pvName, EventStream stream, ETLContext context) throws IOException
Description copied from interface:ETLDest
This appends an EventStream to the ETL append data for a PV.- Specified by:
appendToETLAppendData
in interfaceETLDest
- Parameters:
pvName
- The name of PV.stream
- The EventStream to append to the append data for a PV.context
- ETLContext- Returns:
- boolean True or False
- Throws:
IOException
-
-
getDescription
public String getDescription()
Description copied from interface:StoragePlugin
Get a string description of this plugin; one that can potentially be used in log messages and provide context.- Specified by:
getDescription
in interfaceETLDest
- Specified by:
getDescription
in interfaceETLSource
- Specified by:
getDescription
in interfaceStoragePlugin
- Returns:
- description
-
initialize
public void initialize(String configURL, ConfigService configService) throws IOException
Description copied from interface:StoragePlugin
Each storage plugin is registered to a URI scheme; for example, the PlainStoragePBPlugin uses pb:// as the scheme. Configuration for a storage plugin typically comes in as a URL like URI.- The config service identifies the storage plugin using the scheme ("pb" maps to PlainStoragePBPlugin)
- Creates an instance using the default constructor.
- Calls initialize with the complete URL.
- Specified by:
initialize
in interfaceStoragePlugin
- Parameters:
configURL
- The complete URLconfigService
-- Throws:
IOException
-- See Also:
StoragePluginURLParser
-
getURLRepresentation
public String getURLRepresentation()
Return a URL representation of this plugin suitable for parsing by StoragePluginURLParser- Returns:
- ret A URL representation
-
setRootFolder
public void setRootFolder(String rootFolder) throws IOException
- Throws:
IOException
-
setDesc
public void setDesc(String newDesc)
-
getRootFolder
public String getRootFolder()
-
getDesc
public String getDesc()
-
getPartitionGranularity
public PartitionGranularity getPartitionGranularity()
- Specified by:
getPartitionGranularity
in interfaceETLDest
- Specified by:
getPartitionGranularity
in interfaceETLSource
-
setPartitionGranularity
public void setPartitionGranularity(PartitionGranularity partitionGranularity)
-
getETLStreams
public List<ETLInfo> getETLStreams(String pvName, Timestamp currentTime, ETLContext context) throws IOException
Description copied from interface:ETLSource
Given a pv and a time, this method returns all the streams that are ready for ETL. For example, if the partition granularity of a source is an hour, then this method returns all the streams that are in this source for the previous hours. Ideally, these streams must be closed for writing and should not change. The ETL process will consolidates these streams into the ETL destination, which is expected to be at a longer time granularity.- Specified by:
getETLStreams
in interfaceETLSource
- Parameters:
pvName
- The name of PV.currentTime
- The time that is being used as the cutoff. If we pass in a timestamp way out into the future, we should return all the streams available.context
- ETLContext- Returns:
- List ETLinfo
- Throws:
IOException
-
-
markForDeletion
public void markForDeletion(ETLInfo info, ETLContext context)
Description copied from interface:ETLSource
Delete the ETLStream identifier by info when you can as it has already been consumed by the ETL destination. You can delete it later or immediately.- Specified by:
markForDeletion
in interfaceETLSource
- Parameters:
info
- ETLInfocontext
- ETLContext
-
getLastKnownEvent
public Event getLastKnownEvent(BasicContext context, String pvName) throws IOException
Description copied from interface:Writer
Gets the last known event in this destination. Future events will be appended to this destination only if their timestamp is more recent than the timestamp of this event. If there is no last known event, then a null is returned.- Specified by:
getLastKnownEvent
in interfaceWriter
- Parameters:
context
-pvName
- The PV name- Returns:
- Event The last known event of pvName
- Throws:
IOException
-
-
getFirstKnownEvent
public Event getFirstKnownEvent(BasicContext context, String pvName) throws IOException
Description copied from interface:Reader
Get the first event for this PV. This call is used to optimize away calls to other readers that have older data.- Specified by:
getFirstKnownEvent
in interfaceReader
- Parameters:
context
-pvName
- The PV name- Returns:
- Event The first event of pvName
- Throws:
IOException
-
-
prepareForNewPartition
public boolean prepareForNewPartition(String pvName, Event ev, ArchDBRTypes archDBRType, ETLContext context) throws IOException
Description copied from interface:ETLDest
This informs the destination that we are switching to a new partition and this dest needs to execute its pre-processing for a new partition. For example, in a PlainPBStorage plugin, this will close the previous fileoutputstreams if any, open a new stream to the file backing the new partition writing a header if needed.- Specified by:
prepareForNewPartition
in interfaceETLDest
- Parameters:
pvName
- The name of PV.ev
- This is used to determine the partition for the new partitionarchDBRType
- ArchDBRTypescontext
- ETLContext- Returns:
- boolean True or False
- Throws:
IOException
-
-
commitETLAppendData
public boolean commitETLAppendData(String pvName, ETLContext context) throws IOException
Description copied from interface:ETLDest
This concatenates the ETL append data for a PV with the PV's destination data.- Specified by:
commitETLAppendData
in interfaceETLDest
- Parameters:
pvName
- The name of PV.context
- ETLContext- Returns:
- boolean True or False
- Throws:
IOException
-
-
runPostProcessors
public boolean runPostProcessors(String pvName, ArchDBRTypes dbrtype, ETLContext context) throws IOException
Description copied from interface:ETLDest
Run the post processors associated with this plugin if any for this pv. The post processing is done after the commit and outside of the ETL transaction. This process is expected to catch up on previously missed/incomplete computation of cached post processing files. I can think of at least two usecases for this - one where we decide to go back and add a post processor for a pv and one where we change the algorithm for the post processor and want to recompute all the cached files again.- Specified by:
runPostProcessors
in interfaceETLDest
- Parameters:
pvName
- The name of PV.dbrtype
- ArchDBRTypescontext
- ETLContext- Returns:
- boolean True or False
- Throws:
IOException
-
-
isBackupFilesBeforeETL
public boolean isBackupFilesBeforeETL()
-
setBackupFilesBeforeETL
public void setBackupFilesBeforeETL(boolean backupFilesBeforeETL)
-
getHoldETLForPartions
public int getHoldETLForPartions()
The hold and gather are used to implement a high/low watermark for ETL. ETL is skipped until the first known event in the partitions available for ETL is earlier than hold partitions. Once this is true, we then include in the ETL list all partitions whose first event is earlier than hold - gather partitions. For example, in a PARTITION_DAY, if you want to run ETL once every 7 days, but when you run you want to move 5 days worth of data to the dest, set hold to 7 and gather to 5. Hold and gather default to a scenario where we aggressively push data to the destination as soon as it is available.- Returns:
- holdETLForPartions
-
setHoldETLForPartions
public void setHoldETLForPartions(int holdETLForPartions) throws IOException
- Throws:
IOException
-
getGatherETLinPartitions
public int getGatherETLinPartitions()
-
setGatherETLinPartitions
public void setGatherETLinPartitions(int gatherETLinPartitions) throws IOException
- Throws:
IOException
-
getName
public String getName()
Description copied from interface:StoragePlugin
Multiple PVs will probably use the same storage area and we identify the area using the name. This is principally used in capacity planning/load balancing to identify the storage area for the PV. We should make sure that storage's with similar lifetimes have the same name in all the appliances. The name is also used to identify the storage in the storage report. For example, the PlainPBStoragePlugin takes a name parameter and we should use something like STS as the identity for the short term store in all the appliances.- Specified by:
getName
in interfaceStorageMetrics
- Specified by:
getName
in interfaceStoragePlugin
- Returns:
- name
-
getTotalSpace
public long getTotalSpace(StorageMetricsContext storageMetricsContext) throws IOException
Description copied from interface:StorageMetrics
Gets the total space left on this device.- Specified by:
getTotalSpace
in interfaceStorageMetrics
- Parameters:
storageMetricsContext
- StorageMetricsContext- Returns:
- getTotalSpac
- Throws:
IOException
-
-
getUsableSpace
public long getUsableSpace(StorageMetricsContext storageMetricsContext) throws IOException
Description copied from interface:StorageMetrics
Gets the space available to this VM on this device- Specified by:
getUsableSpace
in interfaceStorageMetrics
- Parameters:
storageMetricsContext
- StorageMetricsContext- Returns:
- getUsableSpace
- Throws:
IOException
-
-
spaceConsumedByPV
public long spaceConsumedByPV(String pvName) throws IOException
Description copied from interface:StorageMetrics
Gets an estimate of the space consumed by this PV on this device.- Specified by:
spaceConsumedByPV
in interfaceStorageMetrics
- Parameters:
pvName
- The name of PV.- Returns:
- spaceConsumedByPV
- Throws:
IOException
-
-
setName
public void setName(String name)
-
getCompressionMode
public PlainPBStoragePlugin.CompressionMode getCompressionMode()
-
consolidateOnShutdown
public boolean consolidateOnShutdown()
Description copied from interface:ETLSource
Should ETL move data from this source to the destination on shutdown. For example, if you are using a ramdisk for the STS and you have a UPS, you can minimize any data loss but turning this bit on for data stores that are on the ramdisk. On shutdown, ETL will try to move the data out of this store into the next lifetime store.- Specified by:
consolidateOnShutdown
in interfaceETLSource
- Returns:
- boolean True or False
-
renamePV
public void renamePV(BasicContext context, String oldName, String newName) throws IOException
Description copied from interface:StoragePlugin
Change the name of a PV. This happens occasionally in the EPICS world when people change the names of PVs but want to retain the data. This method is used to change the name of the PV in any of the datasets for PVoldName
. For example, in PB files, the name of the PV is encoded in the file names and is also stored in the header. In this case, we expect the plugin to move the data to new files names and change the PV name in the file header. To avoid getting into issues about data changing when renaming files, the PV can be assumed to be in a paused state.- Specified by:
renamePV
in interfaceStoragePlugin
- Parameters:
context
-oldName
- The old PV namenewName
- The new PV name- Throws:
IOException
-
-
convert
public void convert(BasicContext context, String pvName, ConversionFunction conversionFuntion) throws IOException
Description copied from interface:StoragePlugin
Sometimes, PVs change types, EGUs etc. In these cases, we are left with the problem of what to do with the already archived data. We can rename the PV to a new but related name - this keeps the existing data as is. Or, we can attempt to convert to the new type, EGU etc. This method can be used to convert the existing data using the supplied conversion function. Conversions should be all or nothing; that is, first convert all the streams into temporary chunks and then do a bulk rename once all the conversions have succeeded. Note that we'll also be using the same conversion mechanism for imports and other functions that change data. So, when/if implementing the conversion function, make sure we respect the typical expectations within the archiver - monotonically increasing timestamps and so on. To avoid getting into issues about data changing when converting, the PV can be assumed to be in a paused state.- Specified by:
convert
in interfaceStoragePlugin
- Parameters:
context
-pvName
- The PV nameconversionFuntion
-- Throws:
IOException
-
-
-