Adds sql generation for count distinct fact columns population.
parent
2dcbe4efd7
commit
ff4ca5e235
|
@ -69,7 +69,7 @@ writeFiles outputDir env@Env{..} = do
|
||||||
, table <- tabs
|
, table <- tabs
|
||||||
, table `notElem` envTables ]
|
, table `notElem` envTables ]
|
||||||
|
|
||||||
factTablePopulateSQLs typ gen = [ (typ, tableName table, sqlStr $ gen env fact)
|
factTablePopulateSQLs typ gen = [ (typ, tableName table, unlines . map sqlStr $ gen env fact)
|
||||||
| (fact, table) <- factTables ]
|
| (fact, table) <- factTables ]
|
||||||
|
|
||||||
sqls = concat [ dimTableDefnSQLs
|
sqls = concat [ dimTableDefnSQLs
|
||||||
|
|
|
@ -49,6 +49,11 @@ settingsParser = let Settings {..} = defSettings
|
||||||
<*> minorOption "fact-count-col-type"
|
<*> minorOption "fact-count-col-type"
|
||||||
settingFactCountColumnType
|
settingFactCountColumnType
|
||||||
"Type of fact table count columns"
|
"Type of fact table count columns"
|
||||||
|
<*> option auto (long "fact-count-distinct-error-rate"
|
||||||
|
<> hidden
|
||||||
|
<> value settingFactCountDistinctErrorRate
|
||||||
|
<> showDefault
|
||||||
|
<> help "Error rate for count distinct calulations")
|
||||||
<*> minorOption "fact-infix"
|
<*> minorOption "fact-infix"
|
||||||
settingFactInfix
|
settingFactInfix
|
||||||
"Infix for fact tables"
|
"Infix for fact tables"
|
||||||
|
|
|
@ -35,7 +35,7 @@ dimensionTablePopulateSQL :: TablePopulationMode -> Env -> Fact -> TableName ->
|
||||||
dimensionTablePopulateSQL popMode env fact =
|
dimensionTablePopulateSQL popMode env fact =
|
||||||
flip runReader env . G.dimensionTablePopulateSQL popMode fact
|
flip runReader env . G.dimensionTablePopulateSQL popMode fact
|
||||||
|
|
||||||
factTablePopulateSQL :: TablePopulationMode -> Env -> Fact -> Text
|
factTablePopulateSQL :: TablePopulationMode -> Env -> Fact -> [Text]
|
||||||
factTablePopulateSQL popMode env =
|
factTablePopulateSQL popMode env =
|
||||||
flip runReader env . G.factTablePopulateSQL popMode
|
flip runReader env . G.factTablePopulateSQL popMode
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ extractFactTable fact = do
|
||||||
[ Column (cName <> settingAvgCountColumSuffix) countColType NotNull
|
[ Column (cName <> settingAvgCountColumSuffix) countColType NotNull
|
||||||
, Column (cName <> settingAvgSumColumnSuffix) (sourceColumnType scName) NotNull
|
, Column (cName <> settingAvgSumColumnSuffix) (sourceColumnType scName) NotNull
|
||||||
]
|
]
|
||||||
FactCountDistinct _ cName -> [ Column cName (countColType <> "[]") NotNull ]
|
FactCountDistinct _ cName -> [ Column cName "json" NotNull ]
|
||||||
_ -> []
|
_ -> []
|
||||||
|
|
||||||
fks = for allDims $ \(fact', tab@Table {..}) ->
|
fks = for allDims $ \(fact', tab@Table {..}) ->
|
||||||
|
|
|
@ -14,7 +14,7 @@ import Control.Applicative ((<$>))
|
||||||
|
|
||||||
import Control.Monad.Reader (Reader, asks)
|
import Control.Monad.Reader (Reader, asks)
|
||||||
import Data.List (nub, find, subsequences, partition, sortBy)
|
import Data.List (nub, find, subsequences, partition, sortBy)
|
||||||
import Data.Maybe (fromJust, mapMaybe, catMaybes)
|
import Data.Maybe (fromJust, fromMaybe, mapMaybe, catMaybes)
|
||||||
import Data.Monoid ((<>))
|
import Data.Monoid ((<>))
|
||||||
import Data.Ord (comparing)
|
import Data.Ord (comparing)
|
||||||
import Data.Text (Text)
|
import Data.Text (Text)
|
||||||
|
@ -120,14 +120,23 @@ dimensionTablePopulateSQL popMode fact dimTableName = do
|
||||||
<> "\nWHERE " <> Text.intercalate " \nAND "
|
<> "\nWHERE " <> Text.intercalate " \nAND "
|
||||||
[ fullColName dimTableName c <> " IS NULL" | (c, _) <- colMapping ]
|
[ fullColName dimTableName c <> " IS NULL" | (c, _) <- colMapping ]
|
||||||
|
|
||||||
factTablePopulateSQL :: TablePopulationMode -> Fact -> Reader Env Text
|
data FactTablePopulateSelectSQL = FactTablePopulateSelectSQL
|
||||||
|
{ ftpsSelectCols :: ![(Text, Text)]
|
||||||
|
, ftpsSelectTable :: !Text
|
||||||
|
, ftpsJoinClauses :: ![Text]
|
||||||
|
, ftpsWhereClauses :: ![Text]
|
||||||
|
, ftpsGroupByCols :: ![Text]
|
||||||
|
} deriving (Show, Eq)
|
||||||
|
|
||||||
|
factTablePopulateSQL :: TablePopulationMode -> Fact -> Reader Env [Text]
|
||||||
factTablePopulateSQL popMode fact = do
|
factTablePopulateSQL popMode fact = do
|
||||||
Settings {..} <- asks envSettings
|
Settings {..} <- asks envSettings
|
||||||
allDims <- extractAllDimensionTables fact
|
allDims <- extractAllDimensionTables fact
|
||||||
tables <- asks envTables
|
tables <- asks envTables
|
||||||
let fTableName = factTableName fact
|
let fTableName = factTableName fact
|
||||||
table = fromJust . findTable fTableName $ tables
|
table = fromJust . findTable fTableName $ tables
|
||||||
dimIdColName = settingDimTableIdColumnName
|
dimIdColName = settingDimTableIdColumnName
|
||||||
|
tablePKColName = head [ cName | PrimaryKey cName <- tableConstraints table ]
|
||||||
|
|
||||||
timeUnitColumnInsertSQL cName =
|
timeUnitColumnInsertSQL cName =
|
||||||
let colName = timeUnitColumnName dimIdColName cName settingTimeUnit
|
let colName = timeUnitColumnName dimIdColName cName settingTimeUnit
|
||||||
|
@ -138,13 +147,13 @@ factTablePopulateSQL popMode fact = do
|
||||||
)
|
)
|
||||||
|
|
||||||
factColMap = concatFor (factColumns fact) $ \col -> case col of
|
factColMap = concatFor (factColumns fact) $ \col -> case col of
|
||||||
DimTime cName -> [ timeUnitColumnInsertSQL cName ]
|
DimTime cName -> [ timeUnitColumnInsertSQL cName ]
|
||||||
NoDimId cName -> [ (cName, fullColName fTableName cName, True) ]
|
NoDimId cName -> [ (cName, fullColName fTableName cName, True) ]
|
||||||
FactCount scName cName ->
|
FactCount scName cName ->
|
||||||
[ (cName, "count(" <> maybe "*" (fullColName fTableName) scName <> ")", False) ]
|
[ (cName, "count(" <> maybe "*" (fullColName fTableName) scName <> ")", False) ]
|
||||||
FactSum scName cName ->
|
FactSum scName cName ->
|
||||||
[ (cName, "sum(" <> fullColName fTableName scName <> ")", False) ]
|
[ (cName, "sum(" <> fullColName fTableName scName <> ")", False) ]
|
||||||
FactAverage scName cName ->
|
FactAverage scName cName ->
|
||||||
[ ( cName <> settingAvgCountColumSuffix
|
[ ( cName <> settingAvgCountColumSuffix
|
||||||
, "count(" <> fullColName fTableName scName <> ")"
|
, "count(" <> fullColName fTableName scName <> ")"
|
||||||
, False
|
, False
|
||||||
|
@ -154,6 +163,7 @@ factTablePopulateSQL popMode fact = do
|
||||||
, False
|
, False
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
|
FactCountDistinct _ cName -> [ (cName, "'{}'::json", False)]
|
||||||
_ -> []
|
_ -> []
|
||||||
|
|
||||||
dimColMap = for allDims $ \(dimFact, factTable@Table {..}) ->
|
dimColMap = for allDims $ \(dimFact, factTable@Table {..}) ->
|
||||||
|
@ -169,31 +179,79 @@ factTablePopulateSQL popMode fact = do
|
||||||
<> Text.intercalate "\n AND " dimLookupWhereClauses
|
<> Text.intercalate "\n AND " dimLookupWhereClauses
|
||||||
in (colName, insertSQL, True)
|
in (colName, insertSQL, True)
|
||||||
|
|
||||||
colMap = [ (cName, if addAs then asName cName sql else sql, addAs)
|
colMap = [ (cName, (sql, groupByColPrefix <> cName), addAs)
|
||||||
| (cName, sql, addAs) <- factColMap ++ dimColMap ]
|
| (cName, sql, addAs) <- factColMap ++ dimColMap ]
|
||||||
|
|
||||||
joinClauses =
|
joinClauses =
|
||||||
mapMaybe (\tName -> (\p -> "LEFT JOIN " <> tName <> " ON "<> p) <$> joinClausePreds table tName)
|
mapMaybe (\tName -> (\p -> "LEFT JOIN " <> tName <> "\nON "<> p) <$> joinClausePreds table tName)
|
||||||
. nub
|
. nub
|
||||||
. map (factTableName . fst)
|
. map (factTableName . fst)
|
||||||
$ allDims
|
$ allDims
|
||||||
|
|
||||||
timeCol = fullColName fTableName $ head [ cName | DimTime cName <- factColumns fact ]
|
timeCol = fullColName fTableName $ head [ cName | DimTime cName <- factColumns fact ]
|
||||||
|
|
||||||
return $ "INSERT INTO "
|
extFactTableName =
|
||||||
<> extractedFactTableName settingFactPrefix settingFactInfix (factName fact) settingTimeUnit
|
extractedFactTableName settingFactPrefix settingFactInfix (factName fact) settingTimeUnit
|
||||||
<> " (\n" <> unlineCols (map fst3 colMap) <> "\n)"
|
|
||||||
<> "\nSELECT \n" <> unlineCols (map snd3 colMap)
|
insertIntoSelectSQL =
|
||||||
<> "\nFROM " <> fTableName <> "\n" <> Text.intercalate"\n" joinClauses
|
FactTablePopulateSelectSQL
|
||||||
<> (if popMode == IncrementalPopulation
|
{ ftpsSelectCols = map snd3 colMap
|
||||||
then "\nWHERE " <> timeCol <> " > ? AND " <> timeCol <> " <= ?"
|
, ftpsSelectTable = fTableName
|
||||||
else "")
|
, ftpsJoinClauses = joinClauses
|
||||||
<> "\nGROUP BY \n"
|
, ftpsWhereClauses = if popMode == IncrementalPopulation
|
||||||
<> unlineCols (map ((groupByColPrefix <>) . fst3) . filter thd3 $ colMap)
|
then [timeCol <> " > ?", timeCol <> " <= ?"]
|
||||||
|
else []
|
||||||
|
, ftpsGroupByCols = map ((groupByColPrefix <>) . fst3) . filter thd3 $ colMap
|
||||||
|
}
|
||||||
|
|
||||||
|
insertIntoInsertSQL = "INSERT INTO " <> extFactTableName
|
||||||
|
<> " (\n" <> Text.intercalate ",\n " (map fst3 colMap) <> "\n)"
|
||||||
|
|
||||||
|
countDistinctCols = [ col | col@(FactCountDistinct _ _) <- factColumns fact]
|
||||||
|
|
||||||
|
updateSQLs =
|
||||||
|
let origGroupByCols = ftpsGroupByCols insertIntoSelectSQL
|
||||||
|
origSelectCols = ftpsSelectCols insertIntoSelectSQL
|
||||||
|
|
||||||
|
in for countDistinctCols $ \(FactCountDistinct scName cName) ->
|
||||||
|
let unqCol = fullColName fTableName (fromMaybe tablePKColName scName) <> "::text"
|
||||||
|
|
||||||
|
bucketSelectCols =
|
||||||
|
[ ( "hashtext(" <> unqCol <> ") & "
|
||||||
|
<> Text.pack (show $ bucketCount settingFactCountDistinctErrorRate - 1)
|
||||||
|
, cName <> "_bnum")
|
||||||
|
, ( "31 - floor(log(2, min(hashtext(" <> unqCol <> ") & ~(1 << 31))))::int"
|
||||||
|
, cName <> "_bhash"
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
selectSQL = toSelectSQL $
|
||||||
|
insertIntoSelectSQL
|
||||||
|
{ ftpsSelectCols = filter ((`elem` origGroupByCols) . snd) origSelectCols ++ bucketSelectCols
|
||||||
|
, ftpsGroupByCols = origGroupByCols ++ [cName <> "_bnum"]
|
||||||
|
, ftpsWhereClauses = ftpsWhereClauses insertIntoSelectSQL ++ [ unqCol <> " IS NOT NULL" ]
|
||||||
|
}
|
||||||
|
|
||||||
|
aggSelectClause =
|
||||||
|
"json_object_agg(" <> cName <> "_bnum, " <> cName <> "_bhash) AS " <> cName
|
||||||
|
|
||||||
|
in "UPDATE " <> extFactTableName
|
||||||
|
<> "\nSET " <> cName <> " = " <> fullColName "xyz" cName
|
||||||
|
<> "\nFROM ("
|
||||||
|
<> "\nSELECT " <> Text.intercalate ",\n" (origGroupByCols ++ [aggSelectClause])
|
||||||
|
<> "\nFROM (\n" <> selectSQL <> "\n) zyx"
|
||||||
|
<> "\nGROUP BY \n" <> Text.intercalate ",\n" origGroupByCols
|
||||||
|
<> "\n) xyz"
|
||||||
|
<> "\n WHERE\n"
|
||||||
|
<> Text.intercalate "\nAND "
|
||||||
|
[ coalesceFKId (fullColName extFactTableName .fromJust . Text.stripPrefix groupByColPrefix $ col)
|
||||||
|
<> " = " <> coalesceFKId (fullColName "xyz" col)
|
||||||
|
| col <- origGroupByCols ]
|
||||||
|
|
||||||
|
return $ insertIntoInsertSQL <> "\n" <> toSelectSQL insertIntoSelectSQL :
|
||||||
|
if null countDistinctCols then [] else updateSQLs
|
||||||
where
|
where
|
||||||
groupByColPrefix = "xxff_"
|
groupByColPrefix = "xxff_"
|
||||||
asName cName sql = "(" <> sql <> ")" <> " as " <> groupByColPrefix <> cName
|
|
||||||
unlineCols = Text.intercalate ",\n "
|
|
||||||
|
|
||||||
joinClausePreds table oTableName =
|
joinClausePreds table oTableName =
|
||||||
fmap (\(ForeignKey _ colPairs) ->
|
fmap (\(ForeignKey _ colPairs) ->
|
||||||
|
@ -205,3 +263,25 @@ factTablePopulateSQL popMode fact = do
|
||||||
_ -> False)
|
_ -> False)
|
||||||
. tableConstraints
|
. tableConstraints
|
||||||
$ table
|
$ table
|
||||||
|
|
||||||
|
toSelectSQL FactTablePopulateSelectSQL {..} =
|
||||||
|
"SELECT \n" <> Text.intercalate ",\n " (map (uncurry asName) ftpsSelectCols)
|
||||||
|
<> "\nFROM " <> ftpsSelectTable
|
||||||
|
<> (if not . null $ ftpsJoinClauses
|
||||||
|
then "\n" <> Text.intercalate"\n" ftpsJoinClauses
|
||||||
|
else "")
|
||||||
|
<> (if not . null $ ftpsWhereClauses
|
||||||
|
then "\nWHERE " <> Text.intercalate "\nAND " ftpsWhereClauses
|
||||||
|
else "")
|
||||||
|
<> "\nGROUP BY \n"
|
||||||
|
<> Text.intercalate ",\n " ftpsGroupByCols
|
||||||
|
where
|
||||||
|
asName sql alias = "(" <> sql <> ")" <> " as " <> alias
|
||||||
|
|
||||||
|
coalesceFKId col = "coalesce(" <> col <> ", -1)"
|
||||||
|
|
||||||
|
bucketCount :: Double -> Integer
|
||||||
|
bucketCount errorRate =
|
||||||
|
let power :: Double = fromIntegral (ceiling . logBase 2 $ (1.04 / errorRate) ** 2 :: Integer)
|
||||||
|
in ceiling $ 2 ** power
|
||||||
|
|
||||||
|
|
|
@ -69,34 +69,36 @@ factColumnName (FactAverage cName _) = Just cName
|
||||||
factColumnName (FactCountDistinct cName _) = cName
|
factColumnName (FactCountDistinct cName _) = cName
|
||||||
|
|
||||||
data Settings = Settings
|
data Settings = Settings
|
||||||
{ settingDimPrefix :: !Text
|
{ settingDimPrefix :: !Text
|
||||||
, settingFactPrefix :: !Text
|
, settingFactPrefix :: !Text
|
||||||
, settingTimeUnit :: !TimeUnit
|
, settingTimeUnit :: !TimeUnit
|
||||||
, settingAvgCountColumSuffix :: !Text
|
, settingAvgCountColumSuffix :: !Text
|
||||||
, settingAvgSumColumnSuffix :: !Text
|
, settingAvgSumColumnSuffix :: !Text
|
||||||
, settingDimTableIdColumnName :: !Text
|
, settingDimTableIdColumnName :: !Text
|
||||||
, settingDimTableIdColumnType :: !Text
|
, settingDimTableIdColumnType :: !Text
|
||||||
, settingFactCountColumnType :: !Text
|
, settingFactCountColumnType :: !Text
|
||||||
, settingFactInfix :: !Text
|
, settingFactCountDistinctErrorRate :: !Double
|
||||||
, settingDependenciesJSONFileName :: !Text
|
, settingFactInfix :: !Text
|
||||||
, settingFactsJSONFileName :: !Text
|
, settingDependenciesJSONFileName :: !Text
|
||||||
, settingDimensionJSONFileName :: !Text
|
, settingFactsJSONFileName :: !Text
|
||||||
|
, settingDimensionJSONFileName :: !Text
|
||||||
} deriving (Eq, Show)
|
} deriving (Eq, Show)
|
||||||
|
|
||||||
defSettings :: Settings
|
defSettings :: Settings
|
||||||
defSettings = Settings
|
defSettings = Settings
|
||||||
{ settingDimPrefix = "dim_"
|
{ settingDimPrefix = "dim_"
|
||||||
, settingFactPrefix = "fact_"
|
, settingFactPrefix = "fact_"
|
||||||
, settingTimeUnit = Minute
|
, settingTimeUnit = Minute
|
||||||
, settingAvgCountColumSuffix = "_count"
|
, settingAvgCountColumSuffix = "_count"
|
||||||
, settingAvgSumColumnSuffix = "_sum"
|
, settingAvgSumColumnSuffix = "_sum"
|
||||||
, settingDimTableIdColumnName = "id"
|
, settingDimTableIdColumnName = "id"
|
||||||
, settingDimTableIdColumnType = "serial"
|
, settingDimTableIdColumnType = "serial"
|
||||||
, settingFactCountColumnType = "integer"
|
, settingFactCountColumnType = "integer"
|
||||||
, settingFactInfix = "_by_"
|
, settingFactCountDistinctErrorRate = 0.05
|
||||||
, settingDependenciesJSONFileName = "dependencies.json"
|
, settingFactInfix = "_by_"
|
||||||
, settingFactsJSONFileName = "facts.json"
|
, settingDependenciesJSONFileName = "dependencies.json"
|
||||||
, settingDimensionJSONFileName = "dimensions.json"
|
, settingFactsJSONFileName = "facts.json"
|
||||||
|
, settingDimensionJSONFileName = "dimensions.json"
|
||||||
}
|
}
|
||||||
|
|
||||||
data ValidationError = MissingTable !TableName
|
data ValidationError = MissingTable !TableName
|
||||||
|
|
Loading…
Reference in New Issue