Class MergeDedupStoragePlugin

  • All Implemented Interfaces:
    ETLDest, ETLSource, StorageMetrics, Reader, StoragePlugin, Writer

    public class MergeDedupStoragePlugin
    extends Object
    implements StoragePlugin, ETLSource, ETLDest, StorageMetrics
    The MergeDedupStoragePlugin is primarily meant for achieving a small amount of failover in the archiving of a PV. The scheme is to have another appliance also archive the same PV and then merge the data from that appliance into this one during ETL. This appliance is the dest appliance; this is where the data is merged into. The other appliance is an independent EPICS archiver appliance archiving the same PV; that is, it is not part of this cluster of appliances. There are no special requirements for the other appliance other than that it should archive the same PV (and of course, a reasonably similar version to this one). No calls are made to the other appliance to cleanup any data after consolidation; so, for convenience, the other appliance can be configured with a BlackholeStoragePlugin to automatically delete data after a certain time. The MergeDedupStoragePlugin has two StoragePlugins parameters in its configuration.
    • The dest parameter configures the data store in this appliance.
    • The other parameter points to the backup appliance using the data_retrieval_url of the other appliance.
    The other appliance also acts an ETL gating point; that is, we do not move the data out of this data store unless we get a valid response from the other appliance when we fetch data during ETL. So, ETL for this PV in this appliance will stop if you bring down the other appliance. It will automatically resume and continue from where it left off once you bring the other appliance back up. Note that data is merged into this appliance only during ETL. So, if you have data that is outside the ETL window ( some complex combination of pause resume etc), you'll have to manually merge the data in. This can be done using the mergeInData BPL call; the PV needs to be paused for this purpose.
    As an example, assume that you wish to merge the data when you move data from the MTS to LTS; then you define your MTS in this appliance as a MergeDedupStoragePlugin. If, for example.
    • you use pb://localhost?name=MTS&rootFolder=${ARCHAPPL_MEDIUM_TERM_FOLDER}&partitionGranularity=PARTITION_DAY&hold=2&gather=1 as your regular MTS.
    • the data_retrieval_url for the other appliance is http://localhost:17669/retrieval
    then, we'd define the MTS for this PV for this appliance as
    merge://localhost?name=MTS
    &dest=pb%3A%2F%2Flocalhost%3Fname%3DMTS%26rootFolder%3D%24%7BARCHAPPL_MEDIUM_TERM_FOLDER%7D%26partitionGranularity%3DPARTITION_DAY%26hold%3D2%26gather%3D1
    &other=pbraw%3A%2F%2Flocalhost%3Fname%3DMTS%26rawURL%3Dhttp%253A%252F%252Flocalhost%253A17669%252Fretrieval%252Fdata%252FgetData.raw
    where both the dest and other parameters are URL encoded.
    • dest is URL encoded version of pb://localhost?name=MTS&rootFolder=${ARCHAPPL_MEDIUM_TERM_FOLDER}&partitionGranularity=PARTITION_DAY&hold=2&gather=1
    • other is URL encoded version of pbraw://localhost?name=MTS&rawURL=http%3A%2F%2Flocalhost%3A17669%2Fretrieval%2Fdata%2FgetData.raw
      • where the rawURL is the URL encoded version of http://localhost:17669/retrieval/data/getData.raw
    From an implementation perspective, this can be understood as a plugin that delegates almost all calls to the dest plugin. Calls that fetch data out of this plugin are merged/deduped with the other plugin.
    Author:
    mshankar
    • Constructor Detail

      • MergeDedupStoragePlugin

        public MergeDedupStoragePlugin()
    • Method Detail

      • getETLStreams

        public List<ETLInfo> getETLStreams​(String pv,
                                           Timestamp currentTime,
                                           ETLContext context)
                                    throws IOException
        Description copied from interface: ETLSource
        Given a pv and a time, this method returns all the streams that are ready for ETL. For example, if the partition granularity of a source is an hour, then this method returns all the streams that are in this source for the previous hours. Ideally, these streams must be closed for writing and should not change. The ETL process will consolidates these streams into the ETL destination, which is expected to be at a longer time granularity.
        Specified by:
        getETLStreams in interface ETLSource
        Parameters:
        pv - The name of PV.
        currentTime - The time that is being used as the cutoff. If we pass in a timestamp way out into the future, we should return all the streams available.
        context - ETLContext
        Returns:
        List ETLinfo
        Throws:
        IOException -  
      • getFirstKnownEvent

        public Event getFirstKnownEvent​(BasicContext context,
                                        String pvName)
                                 throws IOException
        Description copied from interface: Reader
        Get the first event for this PV. This call is used to optimize away calls to other readers that have older data.
        Specified by:
        getFirstKnownEvent in interface Reader
        Parameters:
        context -  
        pvName - The PV name
        Returns:
        Event The first event of pvName
        Throws:
        IOException -  
      • getLastKnownEvent

        public Event getLastKnownEvent​(BasicContext context,
                                       String pvName)
                                throws IOException
        Description copied from interface: Writer
        Gets the last known event in this destination. Future events will be appended to this destination only if their timestamp is more recent than the timestamp of this event. If there is no last known event, then a null is returned.
        Specified by:
        getLastKnownEvent in interface Writer
        Parameters:
        context -  
        pvName - The PV name
        Returns:
        Event The last known event of pvName
        Throws:
        IOException -  
      • prepareForNewPartition

        public boolean prepareForNewPartition​(String pvName,
                                              Event ev,
                                              ArchDBRTypes archDBRType,
                                              ETLContext context)
                                       throws IOException
        Description copied from interface: ETLDest
        This informs the destination that we are switching to a new partition and this dest needs to execute its pre-processing for a new partition. For example, in a PlainPBStorage plugin, this will close the previous fileoutputstreams if any, open a new stream to the file backing the new partition writing a header if needed.
        Specified by:
        prepareForNewPartition in interface ETLDest
        Parameters:
        pvName - The name of PV.
        ev - This is used to determine the partition for the new partition
        archDBRType - ArchDBRTypes
        context - ETLContext
        Returns:
        boolean True or False
        Throws:
        IOException -  
      • appendToETLAppendData

        public boolean appendToETLAppendData​(String pvName,
                                             EventStream stream,
                                             ETLContext context)
                                      throws IOException
        Description copied from interface: ETLDest
        This appends an EventStream to the ETL append data for a PV.
        Specified by:
        appendToETLAppendData in interface ETLDest
        Parameters:
        pvName - The name of PV.
        stream - The EventStream to append to the append data for a PV.
        context - ETLContext
        Returns:
        boolean True or False
        Throws:
        IOException -  
      • commitETLAppendData

        public boolean commitETLAppendData​(String pvName,
                                           ETLContext context)
                                    throws IOException
        Description copied from interface: ETLDest
        This concatenates the ETL append data for a PV with the PV's destination data.
        Specified by:
        commitETLAppendData in interface ETLDest
        Parameters:
        pvName - The name of PV.
        context - ETLContext
        Returns:
        boolean True or False
        Throws:
        IOException -  
      • runPostProcessors

        public boolean runPostProcessors​(String pvName,
                                         ArchDBRTypes dbrtype,
                                         ETLContext context)
                                  throws IOException
        Description copied from interface: ETLDest
        Run the post processors associated with this plugin if any for this pv. The post processing is done after the commit and outside of the ETL transaction. This process is expected to catch up on previously missed/incomplete computation of cached post processing files. I can think of at least two usecases for this - one where we decide to go back and add a post processor for a pv and one where we change the algorithm for the post processor and want to recompute all the cached files again.
        Specified by:
        runPostProcessors in interface ETLDest
        Parameters:
        pvName - The name of PV.
        dbrtype - ArchDBRTypes
        context - ETLContext
        Returns:
        boolean True or False
        Throws:
        IOException -  
      • markForDeletion

        public void markForDeletion​(ETLInfo info,
                                    ETLContext context)
        Description copied from interface: ETLSource
        Delete the ETLStream identifier by info when you can as it has already been consumed by the ETL destination. You can delete it later or immediately.
        Specified by:
        markForDeletion in interface ETLSource
        Parameters:
        info - ETLInfo
        context - ETLContext
      • consolidateOnShutdown

        public boolean consolidateOnShutdown()
        Description copied from interface: ETLSource
        Should ETL move data from this source to the destination on shutdown. For example, if you are using a ramdisk for the STS and you have a UPS, you can minimize any data loss but turning this bit on for data stores that are on the ramdisk. On shutdown, ETL will try to move the data out of this store into the next lifetime store.
        Specified by:
        consolidateOnShutdown in interface ETLSource
        Returns:
        boolean True or False
      • getName

        public String getName()
        Description copied from interface: StoragePlugin
        Multiple PVs will probably use the same storage area and we identify the area using the name. This is principally used in capacity planning/load balancing to identify the storage area for the PV. We should make sure that storage's with similar lifetimes have the same name in all the appliances. The name is also used to identify the storage in the storage report. For example, the PlainPBStoragePlugin takes a name parameter and we should use something like STS as the identity for the short term store in all the appliances.
        Specified by:
        getName in interface StorageMetrics
        Specified by:
        getName in interface StoragePlugin
        Returns:
        name
      • initialize

        public void initialize​(String configURL,
                               ConfigService configService)
                        throws IOException
        Description copied from interface: StoragePlugin
        Each storage plugin is registered to a URI scheme; for example, the PlainStoragePBPlugin uses pb:// as the scheme. Configuration for a storage plugin typically comes in as a URL like URI.
        1. The config service identifies the storage plugin using the scheme ("pb" maps to PlainStoragePBPlugin)
        2. Creates an instance using the default constructor.
        3. Calls initialize with the complete URL.
        The storage plugin is expected to use the parameters in the URL to initialize itself.
        Specified by:
        initialize in interface StoragePlugin
        Parameters:
        configURL - The complete URL
        configService -  
        Throws:
        IOException -  
        See Also:
        StoragePluginURLParser
      • renamePV

        public void renamePV​(BasicContext context,
                             String oldName,
                             String newName)
                      throws IOException
        Description copied from interface: StoragePlugin
        Change the name of a PV. This happens occasionally in the EPICS world when people change the names of PVs but want to retain the data. This method is used to change the name of the PV in any of the datasets for PV oldName. For example, in PB files, the name of the PV is encoded in the file names and is also stored in the header. In this case, we expect the plugin to move the data to new files names and change the PV name in the file header. To avoid getting into issues about data changing when renaming files, the PV can be assumed to be in a paused state.
        Specified by:
        renamePV in interface StoragePlugin
        Parameters:
        context -  
        oldName - The old PV name
        newName - The new PV name
        Throws:
        IOException -  
      • convert

        public void convert​(BasicContext context,
                            String pvName,
                            ConversionFunction conversionFuntion)
                     throws IOException
        Description copied from interface: StoragePlugin
        Sometimes, PVs change types, EGUs etc. In these cases, we are left with the problem of what to do with the already archived data. We can rename the PV to a new but related name - this keeps the existing data as is. Or, we can attempt to convert to the new type, EGU etc. This method can be used to convert the existing data using the supplied conversion function. Conversions should be all or nothing; that is, first convert all the streams into temporary chunks and then do a bulk rename once all the conversions have succeeded. Note that we'll also be using the same conversion mechanism for imports and other functions that change data. So, when/if implementing the conversion function, make sure we respect the typical expectations within the archiver - monotonically increasing timestamps and so on. To avoid getting into issues about data changing when converting, the PV can be assumed to be in a paused state.
        Specified by:
        convert in interface StoragePlugin
        Parameters:
        context -  
        pvName - The PV name
        conversionFuntion -  
        Throws:
        IOException -