The datastore
object comes pre-initialized in all Shooju Processors . It provides the interface for modifying data in Shooju, similar to shooju.RemoteJob
in the Python client library.
Attributes
datastore.importer_id
Holds the processor ID as a str
.
datastore.job_id
If the processor is running under a Processors , this attribute holds the job ID as an int
. If there is no job being run, it returns None
.
datastore.batch_size
Holds the processor batch size as an int. Can be assigned within the processor code before a call to put_point
to change the batch_size.
Methods for writing to series
datastore.put_point(series_id: str, point: Point)
Method to write a single point to a series. Point
is the same as the shooju.Point
provided by the Python client library.
Does not write immediately.
datastore.put_points(series_id: str, points: List[Point])
Same as above, but to write multiple points.
Does not write immediately.
datastore.put_reported_points(series_id: str, reported_date: datetime, points: List[Point])
Writes a point to a series to be available as a “reported point” at the given date.
Does not write immediately.
datastore.put_field(series_id: str, field: str, value: Any)
Method to write a single series field to a series, ie, a single key-value pair.
The “value” data type must be in accordance to the field name suffix. See more in Series .
Does not write immediately.
datastore.put_fields(series_id: str, fields: Dict[str, Any], once: bool = False, source_obj: bool = False)
Same as above, but to write multiple fields.
If once
is True, will only write once for this series_id; subsequent calls that use once=True
will do nothing.
If source_obj
is True, the fields will be wrapped under a source_obj
dictionary key, as in {"source_obj": fields}}
.
Does not write immediately.
Methods for deleting series
datastore.delete(series_id: str, warn_if_non_existant: bool = False)
Sends a series to trash. If warn_if_non_existant
is True, it logs a warning if the series being deleted does not exist.
datastore.delete_by_query(query: str)
Sends all series to trash that match the query and are within the processor's series prefix (available in settings['series_prefix']
.
datastore.delete_untouched(filter_query: Optional[str] = None)
Deletes series with the processor’s series prefix that haven’t been processed by the current job. If filter_query
is passed, that will be used to filter series to delete besides the series prefix.
Control methods
datastore.submit()
Method to flush the queue formed by previous calls to put_point
, put_points
, put_field
, put_fields
, datastore.delete
, ie, makes the changes be applied to the series.
The queue is automatically flushed: during the job, after it grows a specific size; at the end of a successful job (ie, a job with no runtime errors).
datastore.ensure_refreshed()
The index that allows series to be queried by field is periodically updated (it usually takes some seconds) with the most recent changes made by datastore.put_points
, datastore.put_fields
, etc. This method forces an update, which makes series immediately available for querying. Refreshing the index manually is an expensive operation and is not recommended.
It is not necessary to manually refresh the index if you need to query series by a new series ID:
datastore.put_fields(sid, {...})
datastore.get_series(f'sid={sid}', ...)
Manually refreshing is only necessary (although not recommended) when you need to query series by fields that have just been written:
datastore.put_fields(sid, {'color':'red'})
datastore.ensure_refreshed()
for s in sjclient.scroll('color=blue'): # This is now able to scroll through the new series
...