[docs]classSriLanka(CountryTestBase):location:str="Sri Lanka"units:str="tests performed"source_label:str="Sri Lanka Health Promotion Bureau"source_url:str="https://www.hpb.health.gov.lk/api/get-current-statistical"source_url_ref:str="https://www.hpb.health.gov.lk"rename_columns:dict={"date":"Date",}
[docs]defread(self)->pd.DataFrame:"""Reads data from source."""data=request_json(self.source_url)df=self._parse_data(data)returndf
[docs]def_parse_data(self,data:dict)->pd.DataFrame:"""Parses data from source."""pcr_df=pd.json_normalize(data,record_path=["data","daily_pcr_testing_data"]).sort_values("date")art_df=pd.json_normalize(data,record_path=["data","daily_antigen_testing_data"]).sort_values("date")df=pd.merge(pcr_df,art_df)returndf
[docs]defpipe_date(self,df:pd.DataFrame)->pd.DataFrame:"""Cleans date column"""returndf.assign(Date=clean_date_series(df["Date"],"%Y-%m-%d"))
[docs]defpipe_metrics(self,df:pd.DataFrame):"""Pipes metrics"""df=df.assign(**{"Daily change in cumulative total":(df["antigen_count"].apply(clean_count)+df["pcr_count"].apply(clean_count)),})returndf[df["Daily change in cumulative total"]>0].drop_duplicates(subset="Date",keep="last")
[docs]defpipeline(self,df:pd.DataFrame)->pd.DataFrame:"""pipeline for data"""return(df.pipe(self.pipe_rename_columns).pipe(self.pipe_date).pipe(self.pipe_metrics).pipe(self.pipe_metadata).sort_values("Date"))
[docs]defexport(self):"""Exports data to csv"""df=self.read().pipe(self.pipeline)self.export_datafile(df)