データ分析の効率を劇的に上げる!DolphinDBプログラミングの裏技大公開


Summary

データ分析でDolphinDBを使いこなしてる人なら一度はハマる『高階関数のあれこれ』について、実際に試して躓いたポイントも交えつつ解説。コードがスッキリする割に、案外ドキュメントに載ってない便利機能が多いんですよね… Key Points:

  • DolphinDBの無名関数(x -> x + 1みたいなやつ)で、ベクトル処理をサクッと書ける裏技 - 例えばeachと組み合わせれば、面倒なループ文から解放されて10行が1行に
  • cross関数で行列演算が激楽になる実例: 共分散行列の計算とか、二重ループが必要な場面でも「cross(covar, matt)」って書くだけで済む(たぶんcolssよりrowssの方が先だった気がするけど…)
  • NULL値チェックや行フィルタリングの小ネタ集: t.values()で全カラム取得→ラムダ式でNULL判定とか、ディクショナリ形式を活かしたt[...]での行抽出(現場によって書き方変わるから好みの問題かも)
高階関数を使いこなせば、DolphinDBスクリプトが劇的に短くなる反面、『どう動いてるのか』をふと考え込む瞬間もまた味わい深い。


DolphinDBって、なんだか関数型プログラミングもサポートしているらしい。宣言的な書き方で、データに対して関数を次々適用しながら問題を解決するスタイルだとか。まあ、関数そのものを他の関数に渡せるやり方があって、それでコードが妙にすっきりしたり、ちょっと面倒な処理でもごく短い行で済むこともあるようだ。ここでは、その中でも特に高階関数とそれを使った事例――よくありそうな使い道――について触れているみたい。

CSVファイルなんかからデータ取り込む場合、時刻の表現が整数になってるのは珍しくないんじゃないかな。「93100000」みたいなのを見ると、「9:31:00.000」辺りだったと思うけど……確かじゃないけど、大体そんな感じだった気がする。このままだと後々クエリとか分析するとき面倒だから、保存前にDolphinDBのTIME型へ変換しちゃうのがおすすめらしい。

loadTextExという関数には_transform_パラメータがあって、その時刻カラムだけ特殊な変換処理を書いておけば、自動的にTIME型への置換えもやれる。

例えばこんなCSV、一部しか見てないけど:
symbol,exchange,cycle,tradingDay,date,time,open,high,low,close,volume,turnover,unixTime
000001,SZSE,1,20180102,20180102,93100000,...

…みたいな構造。この「time」がINT扱いになるので、本来はTIMEとして保存したほうが後々便利。

まずは日付でVALUEパーティション分割されたデータベース作成から始めている。細かい範囲は覚えてないけど、2018年1月初旬から下旬くらいまでだったかな? 手順としてはextractTextSchemaという機能でCSVからスキーマ推定→「time」列だけ手動でTYPE変更(INT→TIME)→その修正版スキーマを元にテーブル定義&パーティション化、と進めていたような……直感的にはschemaTB=extractTextSchema(dataFilePath) みたいな流れだと思う。

ちなみに、このスキーマ抽出プロセス自体飛ばして最初から全部手書きでもOK。

さて、「time」の列値をどうやって整数→時間形式へ直すかというと、自前のi2t(名前適当)という変換関数を書く必要ありそう。それぞれreplaceColumn!みたいなインプレース更新系API(これ速いやつ)で加工してた印象。
ざっくり雰囲気としては、
def i2t(mutable t){
return t.replaceColumn!(`time,t.time.format("000000000").temporalParse("HHmmssSSS"))
}

こういうノリ。実際には「format」でゼロ詰め文字列化→「temporalParse」で時間型へ落とし込み。このi2tをtransform引数経由でloadTextExへ渡すことで自動処理される。

ロードした結果、SELECT文叩いて中身確認──最初のふたつくらい見れば充分かな。
結果として、「09:31:00.000」みたいな形で格納されていた記憶。

一連のコード断片まとめると、
login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
// 以下略

この辺全部並べれば再現できそう。ただ細部まで正確には覚えてなくて、ごく大まかな流れのみ記憶頼り……

もうひとつ別件では、ナノ秒単位のタイムスタンプ(整数)をNANOTIMESTAMP型へ変換するケースもあった気がする。その場合テキストファイル名[nx.txt](区切り記号#)、カラム名もちょっと違う感じ。「SendingTimeInNano」とか「origSendingTimeInNano」とか入っていたようだ。

パーティショニング方式もCOMPOタイプ採用して二重分割──送信時刻×証券ID、それぞれ何千〜万程度幅取ったレンジ設定。全体像思い出す限り、
- データベース複数用意
- テーブルnx新規作成
- スキーマ明示指定

取り込み時はdataTransformなる新しい変換ロジック必要。一箇所ずつnanotimestamp()関数通してNANOTIMESTAMP型へ丸ごと置き換える仕組みにしていたようだった。
def dataTransform(mutable t){
return t.replaceColumn!(`SendingTimeInNano,nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano,nanotimestamp(t.origSendingTimeInNano))
}

あとはloadTextEx呼び出し時transform=dataTransform追加──delimiter='#'指定も忘れず、と書いてあった気がする。

この辺、一度試せばピンと来る人多そうだけど、不安ならユーザーマニュアル参照したほうが安心かもしれない。本当によくある用途なので……


DolphinDBで関数を書くとき、名前付きのものもあれば、なんか一言で済む無名関数(ラムダ式)みたいなのもあるみたい。例えば、「xってやつをちょっとだけ増やしたいな」って思ったら、x -> x + 1って書いて、それをeachに渡すことで、何人かいる数字たちそれぞれに処理できる。もう少し言うなら、一から十くらいまで順番にならんだ数字の列があって、その一個一個全部に「ちょっと増やしてね」と頼める感じ?まあ、簡単と言えば簡単。

でも、「クロス」っていう高階関数になると話は変わる。2つのベクトルとか行列、まあ配列同士だね、それぞれ全部の組み合わせについて二項関数を適用するみたいなイメージ。たしかfor文が何重にもなったりして面倒そうだけど……本来なら多分二重ループでゴリゴリ書くところを、cross(covar, matt)みたいに一言ですませられるっぽい。covariance matrix計算したい時なんて特に便利そうだよね。でも細かく思い出そうとすると、「rowss」とか「colss」とかどっちがどっちだったかな、と迷うことも。

そのほかにも「each」「peach」「loop」「ploop」みたいなのもあった気がするけど、この辺は毎回全部の要素やカラムごとに繰り返す時役立つっぽい。ただ、「size」は全体の大きさ、「count」は空欄じゃない部分だけ数えてくれるので、その差分がNULL値の個数になる…と誰かが教えてくれた気がする。テーブル構造の場合はt.values()で全部のカラム取り出せて、それぞれ「x->x.size() - x.count()」という式使えばNULL値チェックできる……だったと思うけど、正直そこまで頻繁には使わないから若干曖昧。

あと面白かった(?)のは行ごとのフィルタリングかな。サンプルテーブル作ったとして、一部カラムにNULL混じってたりするんだけど、それを消したいとなった場合、t[each(x -> !(x.id == NULL || x.id2 == NULL), t)]みたいに書けば意外と簡単。それぞれの行はディクショナリ形式で見えるから、「このidとかid2がNULLじゃなければ残してね」と指定できる。でも現場によってはこれ以外にも方法ありそうだし、人によって好み違う印象もある。

結局、高階関数使うことでコード短縮できたりエラー減らせたりする反面、中身まではパッとは把握しづらかったり、一瞬「あれ?」となることもしばしば……そんな気がしている。
Extended Perspectives Comparison:
機能説明適用例
データインポートCSVファイルの取り込みとカラム変換を行うことができる。特に、整数型時間をTIME型に変換することが可能。loadTextEx関数で_transformパラメータを使用して自動整形しながらデータをインポート
高階関数関数を引数として受け取り、複雑な処理を簡潔に記述できる。特に、eachやloopなどの高階関数が活用される。ベクトルや行列の操作、NULL値のカウントなど
NULL値処理NULL以外の要素のみをカウントしたり、行ごとまたは列ごとのフィルタリングが可能。t[each(x -> !(x.id == NULL || x.id2 == NULL), t)] でNULL含む行除去
moving関数指定された範囲内で条件を満たす値の割合を計算し、その結果に基づいて新しい列(signal)を生成する。rangeTest関数と組み合わせて直近20件のclose値チェック
パフォーマンス最適化msum系やmavg系の専用関数利用によって、大量データ処理時の速度向上が図れる。moving(sum)よりもmsum使用時に圧倒的な速度差

CSVファイルの時刻データをTIME型に変換してインポートする手順

もしテーブルの列がかなり多い時、全部列挙するのはちょっと現実的じゃない。そんな時に使える式があって、
t[each(x -> all(isValid(x.values())), t)]

って感じで書くんだとか。中身をざっくり言うと、1行ずつ値を辞書っぽく拾ってきて、それぞれNULLじゃないか(isValid)確かめる。それで「全部OK?」みたいな感じでall関数が効いてる。
まあでも、データ量が七十万とか八十万行超えてくると、こういうrowごとの操作ってどうも遅いらしい。

一方でDolphinDBはカラム単位での処理に強いという話もあって、「カラムごと」のチェック方法もあるわけさ。
t[each(isValid, t.values()).rowAnd()]

これだとisValidを各カラムへバーッとかけて、その結果マトリックス状にしてからrowAndで「どこか抜けてる?」みたいなの探す流れ。でもね、本当にデータ量がぐっと増えると、
The number of cells in a matrix can't exceed 2 billions.

…こんなエラー出たりすることもあるみたい。この場合はreduce使ったやり方になる。
t[reduce(def(x,y) -> x and isValid(y), t.values(), true)]

なんだか冗長だけど、大規模データ向きなんだろう。

3章2節3項ではパフォーマンス差について触れていたと思う。「aaaa_bbbb」みたいな文字列を「bbbb_aaaa」にしたい場合の話だった気がする。まずテーブルtを作成、と。
t=table(take("aaaa_bbbb", 1000000) as str);

この変換には二通りアプローチがあるようだ。一つ目はrow-wiseつまり行ごとの処理。
each(x -> split(x, '_').reverse().concat('_'), t[`str])

これ、一件ずつsplitしてreverseしてconcatして…まぁ地味に手間取るやつ。もう一つはカラム全体まとめて:
pos = strpos(t[`str], "_")
substr(t[`str], pos+1)+"_"+t[`str].left(pos)

アンダースコアの位置特定→そこから先頭・末尾切り分け→合体。この方式ならほんの一瞬(百ミリ秒くらい?)で終わったという声も聞く。一方前者は何倍も時間食うらしい。

コードとしてはこんな感じだった気がする:
t=table(take("aaaa_bbbb", 1000000) as str);

timer r = each(x -> split(x, '_').reverse().concat('_'), t[`str])

timer {
pos = strpos(t[`str], "_")
r = substr(t[`str], pos+1)+"_"+t[`str].left(pos)
}


さて3.2.4節(番号ちゃんと覚えてないけど)は「二つのテーブル内容まったく同じ?」という判定法だったっぽい。同じ構造(たぶん)、同じ値か比較したかったら、
all(each(eqObj, t1.values(), t2.values()))

こういう風にやるそうな。

ところでloop(:U)についてもちょっと説明あったっけ。「each」と似ているようでも返却される型(形)が違うんだよね。
「each」はサブタスクそれぞれの戻り値しだいでvectorだったりmatrixだったりtableになったりバラバラ。でもloopの場合はいつでもtuple固定(ここ不思議)。

例えば各カラムの最大値調べたかったら…
t = table(1 2 3 as id, 4 5 6 as value, `IBM`MSFT`GOOG as name);
t;
loop(max, t.values());

こんな風になる。そしてimport関連――複数ファイルCSV取り込みたかったらloop活用できる例として、
loop(loadText, fileDir + "/" + files(fileDir).filename).unionAll(false)

こういうスニペットも紹介されていた気がする。

移動系関数についてチラッと触れておくと、「moving」を使えば一定範囲内に収まっているレコード抽出できたりする。UpAvgPrice/DownAvgPriceという上下限設定し、「close」列20個分くらい過去までさかのぼってそのレンジ内か確認――こんな用途想像してほしい。ちょっと曖昧だけど、おおよそそんな雰囲気だったと思う。

たしか、「signal」ってやつ、範囲内に入ってる値が七割半くらいならtrueにするんだったかな。データの例だと、例えば2019年6月17日あたりの行では、そのレンジがだいたい11.5~12.8くらいになってて、それより前に二十個ほど並んでる「close」値をざっと見渡して、そのほとんど(四分の三ぐらい)がその範囲内なら「signal」がオンになる。
 
rangeTestという関数で判定してみる手もあるようだ。これ、スライドしながら範囲に収まっているかどうかを数えて、合格ライン(七割台後半ぐらい?)を超えたらOKみたいな感じ。betweenを使うことで区間チェックできるっぽい。「moving」を噛ませば、この検査がずーっとウィンドウごとに走る。

コード自体はこんな雰囲気だったと思うけど──
rangeTest(close, downlimit, uplimit){
size = close.size() - 1
return between(close.subarray(0:size), downlimit.last():uplimit.last()).sum() >= size*0.75
}
update t set signal = moving(rangeTest, [close, downAvgPrice, upAvgPrice], 21)

実験用のダミーデータで動作確認もできそう。「rand」とか使えば、一万件とかそれ以上のランダムデータを適当に生成して試せる。全部日本株っぽく見せかけてもいいし、とりあえず集計には問題ない。

rollingとmoving、このふたつ結構似ているけど、ステップ幅を細かく変えたいならrollingが向いている気もする。ただnull値への対応とかは微妙に違うから、そのへん要調整。

あと速度面で言えば、「moving(sum)」より専用のmsum系(mcountとかmavgも)使った方が遥かに早かった記憶あるなあ。何十倍もの差が出てもおかしくないとか聞いたことある。それもそのはずで、msum系は一度メモリ上に読み込んだデータを再利用できるし、計算自体も増分方式だから無駄なく進める。でもmovingだと毎回新しい部分配列作っちゃうから…まあ仕方ないか。

そんなわけで、大量データ相手だとmsum一択になりそうだけど、小規模や特殊事情ならmovingでも悪くないかなぁ、とぼんやり思ったりする。

ラムダ式を使いこなしてDolphinDBのコードを簡潔に書くコツ

「moving(sum)」って、確かに毎回ウィンドウ全体を再計算してた気がする。ところで、「eachPre」ってやつ、何だったかな……。 まあ、そういう話はさておき、今回はちょっと変わったテーブル操作の例を挙げてみる。

まず、多分五種類ぐらいのシンボル(a〜eとか)が適当に並んでて、それぞれに値段っぽい数字が付いてる表があるとしよう。百行くらい?いや、本当はもう少し多かったかも。でもまあ、その程度。

このテーブルでやりたいことが二つあったはず。一つ目は、「ln」っていう新しい列を作ること。その中身はというと……今の行のbidPriceを、ひとつ前までの三行分(自分自身含めない)のbidPrice平均で割ってから、その結果の自然対数を取る感じ。なんだか面倒くさいけど、要するに直近三件平均との比率を見るってことじゃないかな。

この計算には「moving」とか色々手法があるけど、「mavg」使うと速いよね、と誰か言ってた気がする。「prev」と組み合わせればいい感じにできるんだけど、二通りぐらい書き方あったな……一つ目だと、「mavg」の後ろに「prev」をくっつけてたような。もう一個は順番逆。でも結果として出てくるデータはほぼ同じだった印象。ただ、一部ズレ始める位置が違うくらい?

で、本題。次はその「ln」が妙な値になった場合の処理。「clean」という新しい列を用意して、不自然な変動――例えば急激すぎたりした場合――を除外したかったらしい。「fluctuation threshold」と呼ばれていた境界値Fがあって、その絶対値を超えちゃう場合だけ直前の「ln」を代わりに使う仕組みだったと思う。ただ、このFもごく小さい数字だった気がするけど…正確には覚えてないけど二十も無かったんじゃないかな?

ここでちょっと工夫されていたのが、「eachPre」という関数的なもの。それぞれ隣同士(今と前)についてさっき説明した判定関数(abs(x)>Fならy, それ以外ならx)を適用していく感じだったっぽい。こうすると、一連のデータ全部に対してペアごとの比較・補正処理が走る。

コードとしては、最初にF設定してからテーブル生成→ln計算→cleanFun定義→eachPre適用…そんな流れになってたと思う。途中端折っている部分もあったかもしれないけど、大体こんな雰囲気。

最後ちょっと余談になるけど、「byRow」というセクションではマトリックス内各行ごとの最大値インデックス探索例も紹介されていた記憶あり。でもそこまで詳細追えてなくて…また機会があれば見返したい気もする。

# 関数型プログラミングのケース![]

DolphinDBって、なんだか宣言的なスタイルで「関数を次々データに適用していく」やり方がウリみたい。高階関数とかも普通に使えるから、コードもかなり簡素にできるし、理屈っぽいロジックもすっきり片付くらしい。そういう機能を活かした代表的な使い道、特に高階関数をどう使うかってところ、ここではたぶん何例か紹介されている感じ。

## 1.データ取込まわりの事例

世の中のCSVファイルには、たとえば「93100000」みたいな感じで時刻を整数で持ってるものが割と多い。あれ、「9:31:00.000」を意味してたりするけど、そのままだと検索や集計が面倒だよね。DolphinDBなら、この手のカラムはTIME型とかに変換しちゃったほうが後々楽らしい。

この時、「loadTextEx」っていう読み込み関数についてる_transform_パラメータ――まあ要するに列ごとの変換ロジック――それを指定しておけば、自動的に整形しながらインポートできる仕掛け。

### 1.1 整数タイムスタンプ→TIME型への直し

適当なサンプルCSV(たしか[candle_201801.csv])で説明していた気がする。
頭だけ抜粋すると:

symbol,exchange,cycle,tradingDay,date,time,open,high,low,close,volume,turnover,unixTime
000001,SZSE,1,20180102,20180102,93100000,13.35,...


**(1) データベース作成**
日付単位(VALUEパーティションだったかな?)で新規作成。
login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)

なんとなくこうだったはず。

**(2) テーブル定義**
extractTextSchemaでCSVからスキーマ自動抽出。ただ「time」は最初INT扱いになるので、そのあとUPDATE文みたいなのでTIME型へ修正。そのスキーマ情報元に本テーブル生成・日付パーティション化。
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);

まあ手作業でカラム書いてもよかった気もするけど、この流れがよく見るやつ。

**(3) インポート処理**
まずカラム加工用の関数i2t定義。これでtime列だけINTからTIMEへ一斉変換させちゃう。そのままtransform引数としてloadTextExに渡すことになる。
def i2t(mutable t){
return t.replaceColumn!(`time,t.time.format("000000000").temporalParse("HHmmssSSS"))
}

ちなみに!付きのメソッドはインプレース高速処理らしい。

そんで実際読むとき:
tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,
partitionColumns=`date,
filename=dataFilePath,
transform=i2t);


**(4) 確認**
最初の二行だけ確認すると、
select top 2 * from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date time open ...
------ -------- ----- ---------- ---------- -------------- -----
000001 SZSE ... ... ... 09:31:00.000 ...

こんなふうになってたような覚えある。桁落ちたり細部違ったかもしれないけど、大筋合ってるはず。

**全部まとめて書くとこう:**

login(`admin,...
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,...)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,...
def i2t(mutable t){
return t.replaceColumn!(`time,t.time.format("000000000").temporalParse("HHmmssSSS"))
}
tmpTB=loadTextEx(dbHandle=db,...
transform=i2t);


### 1.2 整数ナノ秒タイムスタンプ→NANOTIMESTAMP型への変換

もうひとつ事例挙げておくと、今度はナノ秒単位の巨大整数値。「nx.txt」とかいうテキストファイル例だったと思う。
ざっと中身イメージすると、

SendingTimeInNano#securityID#origSendingTimeInNano#bidSize
1579510735948574000#27522#1575277200049000000#1
...

区切り文字が#になってる点とか少しクセ強め。SendingTimeInNano系カラム(複数?)にはナノ秒整数詰め込まれている模様。

**(1) データベース/テーブル設定**
COMPOパーティション方式使いつつ、新規DFS DB&スキーマ明示。この辺ややこしい気配だった記憶。
dbSendingTimeInNano = database(... VALUE ...)
dbSecurityIDRange = database(... RANGE ...)
db = database("dfs://testdb", COMPO,[...])
nameCol = `SendingTimeInNano`securityID...
typeCol = [`NANOTIMESTAMP,...]
schemaTb = table(1:0,nameCol,typeCol)
nx = db.createPartitionedTable(schemaTb,...)


**(2) データ投入**
データ取り込む前段階として、
nanotimestamp()内蔵関数駆使してINT→NANOTIMESTAMP全件変換させるdataTransformという関数を書いて、それをtransformとして指定する流れ。
def dataTransform(mutable t){
return t.replaceColumn!(`SendingTimeInNano,nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano,nanotimestamp(t.origSendingTimeInNano))
}
pt=loadTextEx(
dbHandle=db,
tableName=`nx ,
partitionColumns=`SendingTimeInNano`securityID,
filename="nx.txt",
delimiter='#',
transform=dataTransform);

細部違う箇所あったらご容赦だけど、おおむねこういう仕組み。テキスト取込み周りでもっと知りたかったらユーザマニュアル参照推奨――そんな案内も添えてあったような?

## 2.

……続きはここまでしか残ってないっぽい

each関数を使ってテーブルのNULL値を効率的に処理するテクニック

DolphinDBで関数を書くとき、なんだか二種類くらいのやり方があるみたい。まあ、名前をつけて定義するのと、その場しのぎというか、匿名的な書き方(ラムダ式とかいう)ね。普通は一文だけ書く感じ?ちょっと前に「x -> x + 1」みたいなコード見た気がする。それを「each」っていう処理に食わせると、一連の数字(多分十個くらいだったかな)が全部ひとつずつ増えて戻ってくる。細かい例はまたどこかで紹介されるっぽい。

それから、高階関数ってやつの使いどころについても触れられていて、「cross」とかいう機能が目立ってた気がする。一度に二組のベクトルや行列を渡して、それぞれ全部掛け合わせて計算させるイメージ?うろ覚えだけど、昔ながらの方法だとネストしたforループを何重にも回して地道に求めてたような…。matlab風に書いたcovariance matrixの計算例もあったけど、とても長々しくて面倒そうだった印象。でもDolphinDBなら「cross(covar, matt)」みたいにすごく短くできちゃう…夢みたいな話。

「each」という高階関数も何度も出てきたなぁ。なんでもないforループより全然手軽で使いやすそう。「peach」とか「loop」「ploop」なんて似た兄弟もいるらしいんだけど、正直細部まではよくわからない…。

あとNULL値絡みで面白かった記憶。ベクトルや行列の場合、「size」で全体のおおよその要素数を取れるけど、「count」だとNULL以外しかカウントしないので、差分がそのまま欠損値になる仕組みっぽい。テーブル形式なら、「t.values()」で各カラムまとめて取得して、「each(x->x.size() - x.count(), t.values())」みたいな形で一斉にNULL数を洗い出せたりする…らしい。

サンプル用テーブルもちょっと作ってたかな。「sym」に三種類くらい文字列詰め込んで百個ちょっと並べたり、「id」「id2」と呼ばれるカラムでは空っぽデータ混ぜたり入れ替えたりして実験していた記憶がある。その中身からNULL入り行だけ取り除く方法…ふた通りくらいあったはず。

まず一つ目は各行ごと調べ上げる方式。これはrow-wise filteringとかそんな言葉だったような?t[each(x -> !(x.id == NULL || x.id2 == NULL), t)]という感じで書いていたんじゃないかな…。ちなみにこの場合、一行ずつdictionary型(連想配列っぽいやつ)になるので注意、と補足もあったような気がする。そのラムダ式は特定カラム(idとかid2)にNULL値が含まれてないか判定している仕掛けになっていた…と思う、多分ね。

もし表にずいぶん多い列があるなら、全部を一つ一つ挙げるなんてほぼ無理。そんなときは例えばこういう式が使われてたりするみたい。

t[each(x -> all(isValid(x.values())), t)]


このやり方はね、row(行)ごとに値を全部取り出して、それぞれがNULLじゃないかどうかを`isValid()`で見ていくっていう流れらしい。でも、数十万とかそれ以上のデータになると、行ごとのチェックってだいぶ遅くなることもある。DolphinDBって列志向らしいから、やっぱり列ごとに処理したほうが速いみたい。それでさ、

t[each(isValid, t.values()).rowAnd()]


これだと全カラムまとめてバッサリ確認できるので効率いいらしい。何かしら超巨大なデータを扱う時には、「セルの数が二十億ちょっと超えてしまいました」みたいなエラーも出ちゃったりするそうだ。そういう場合はreduce使えば解決…っぽい?

t[reduce(def(x,y) -> x and isValid(y), t.values(), true)]


それからパフォーマンス話で思い出すけど、カラム変換の例もあった。「aaaa_bbbb」を「bbbb_aaaa」に入れ替えるやつ。一気に百数十万件くらいやる感じだったかな。row-wise(行単位)の方法では文字分割→逆順→連結という手順を回してたけど…

each(x -> split(x, '_').reverse().concat('_'), t[`str])


column-wise(列単位)だとアンダースコアの位置探して、一括で前後入れ替えしちゃう。

pos = strpos(t[`str], "_")
substr(t[`str], pos+1)+"_"+t[`str].left(pos)


大体だけど、前者だと処理時間が七十倍近く長引いてた気がする。後者なら一瞬、とまでは言わないけど実質待たなくても済むくらい早かった。

コードも一応こんな感じだった:

t=table(take("aaaa_bbbb", 1000000) as str);

timer r = each(x -> split(x, '_').reverse().concat('_'), t[`str])

timer {
pos = strpos(t[`str], "_")
r = substr(t[`str], pos+1)+"_"+t[`str].left(pos)
}


あと似たようなので、「二つのテーブル中身同じ?」って調べるためには各カラムごとに比較すると良さそうで、

all(each(eqObj, t1.values(), t2.values()))


こうやって一致チェックできることになっていた。

"loop"(ループ)について少し脱線すると、「each」とよく似ているけど返す型とか形態に微妙な違いあるみたい。"each"はサブタスクごとの戻り値によって返すもの変わるんだけど、

- サブタスク全部スカラー値ならvector返す。
- ベクトルばっかならマトリックスになる。
- 辞書型ならテーブル。
- それ以外混ざったらtupleになるとか。

例としては、

m=1..12$4:3;
m;
each(add{1 2 3 4}, m);


これでどうなるか…まあ確かベクトルになっていたはず。一方loopの場合は結果が必ずtuple。この辺ちょっと混乱しやすいかな…。

たしかこんなコードだった:

t = table(1 2 3 as id, 4 5 6 as value, `IBM`MSFT`GOOG as name);
t;
loop(max, t.values());


こっちは毎回tupleになる。

ちなみに複数CSVファイルをまとめて読み込む場合にもloop活躍する場面ありそうだね。同じ構造のCSV何枚もディレクトリ内に転がしておいて、一気読みして連結したかったら

loop(loadText, fileDir + "/" + files(fileDir).filename).unionAll(false)


みたいなこと書けばOKっぽかった。

最後にmoving関数の話もちょっと触れておきたい。「一定期間内で値が上下限レンジ収まっているか?」を見る用途。このケースでは「UpAvgPrice」と「DownAvgPrice」が基準で、その間に直近二十件ほどのclose値が収まれば条件クリアだったと思う。そのへん曖昧だけど、多分そんなイメージ。

moving関数でスライディングウィンドウ計算を行う実用的な例

「えーと、もし全体の七割五分をちょっと超えるくらいの値が範囲内に収まっていたら、“signal”カラムはtrueになるんだとか。まあ、そうじゃなければfalseらしい。うーん、例えば2019年6月17日あたりのデータを見てみると、その“range”って[11.5くらい, 12.8強]みたいな感じ。で、その前後二十個ぐらいの“close”値(図だと1番目?)を調べておいて、その大半がこの幅に入ってたら“signal”列(4番目?)がtrueになる仕掛け。

なんか関数でやるみたい。“rangeTest”っていうやつで、ウィンドウごとの“close”値がどれだけ範囲にいるかざっくり計算して、それを高階関数(movingだったかな…)で表全体にスライドさせて使うって話だったような気もする。

こんなコード例もあったような…いや違ったかもしれないけど:

defg rangeTest(close, downlimit, uplimit){
size = close.size() - 1
return between(close.subarray(0:size), downlimit.last():uplimit.last()).sum() >= size*0.75
}

update t set signal = moving(rangeTest, [close, downAvgPrice, upAvgPrice], 21)

要するに今見てる行よりも前の二十行ぐらいをチェックしたいから、ウィンドウサイズは二十一。でも実際にはきっちり二十一じゃなくてもいい場面もありそう。“between”という関数で、それぞれの値がその区間内かどうか確認してるみたい。

そもそも本物の市場データでもこれ検証できるよ、と作者は言いたかったんだろうね。例えば乱数使って適当に表作ったりしてテストする、と。

t=table(rand("d"+string(1..n),n) as ts_code, nanotimestamp(2008.01.10+1..n) as trade_date, rand(n,n) as open, rand(n,n) as high, rand(n,n) as low, rand(n,n) as close, rand(n,n) as pre_close, rand(n,n) as change, rand(n,n) as pct_chg, rand(n,n) as vol, rand(n,n) as amount, rand(n,n) as downAvgPrice, rand(n,n) as upAvgPrice, rand(1 0,n) as singna)

あとね、“rolling”と“moving”、似てるけど細かい点で違うんだよね。“rolling”はステップ幅とか指定できたりするし、“moving”は一つずつしか進めない。一方でnullへの対応もちょっと違うとか聞いたことあるけど、自分もうろ覚え…。

それからパフォーマンス比較について書いてあった気が…。特定用途ならmsumとかmavg系(m-関数)の方が断然速いよ、と。“moving(sum)”より何十倍も早くなることだって珍しくない。例えば百万円規模近くの配列だったらmsumならほんのわずかな時間ですむし、moving(sum)は結構遅れる。データアクセス方法にも違いがあって、msum系ならメモリ上ですべて再利用できるけど、moving系は毎回新しいサブオブジェクト作っちゃうから無駄多め。それに加えて計算自体もmsumの場合は一つ足して一つ引くだけだから効率的。まあ要するに大量データ処理では専用関数推奨ですよということなんじゃないかなと思う。」

対照的に言うと、「moving(sum)」って、どうも毎回ウィンドウ全体を再計算してるっぽいんですよね。なんかちょっと面倒というか、時間がかかるような気がします。さて、次の話題ですけど……「eachPre(:P)」を使った例で、まあ分かりやすくするために適当にシンボルと価格のテーブルを用意してみました。たしか五つくらいの銘柄名(aとかbとか…eまでだったかな?)があって、それぞれのbidPriceが七十だか八十だか百近くランダムで生成されている感じです。

このサンプルでやりたいことは二つあるんですが、まず一つ目。「ln」という新しい列を作る必要があります。これはね、各行について、そのbidPriceを直前三行分くらい(厳密には三行だけど、自分はそこまで細かく見てない)の平均値で割ってから、その商の自然対数――要するにlogですね――それを求めます。ちなみにこの時点では今までの自分の経験上、大体こんなやり方になると思いますけど、人によっては違うアプローチもありえるので絶対じゃないです。

そして次なんですが、この「ln」列からさらに「clean」っていうものを派生させます。これはどういうものかというと、「ln」の中にヘンな値――つまり普通じゃ考えられないくらい大きく振れてしまった値――が混じっていた場合、それだけ除外したいわけですね。でも完全に捨てるわけじゃなくて、一個前の「ln」の値で置き換えてみる、といった感じになります。ただし何をもって“異常”とするかは微妙なところで、自分の場合はフラクトエーション閾値Fみたいなの決めています。このFについても本当に正しい値なのかよく分からず、多分0.02ぐらい(いやもっと小さい/大きい人もいるでしょう)にしています。

コード的には、「ln」を出すためにはmoving(avg, bidPrice, 3)みたいなの使う方法もありますが、自分はもうちょっと効率重視なのでmavg関数推しです。prev(mavg(bidPrice, 3))としておけば過去三件ぐらいの移動平均になる……はずです。たぶんこれ、t2とかt22みたいなテーブル名になっていますけど、中身自体ほぼ同じでした。

それから異常値クリーニングですけど、「cleanFun」という関数を書いておいてabs(x) > Fだったらy(つまり1個前)に戻すようになっています。このロジック自体も完全無欠とは限らなくて、本当はもっと複雑でもいいんでしょうね。ただ今回は簡単なのでこうしています。それをeachPreという高階関数でごそっと処理すると、各行ペアごとに判定される仕組みになっています。

実際全部合わせたスクリプト例としては、
- F = 0.02ぐらい
- テーブル作成
- ln計算
- 関数定義
- eachPre適用

この流れですが……まあ正直途中途中思いつきレベルで書いているので穴がある気もします。そのまま流用して大丈夫とは全然言えません!

そうそう、このあとbyRow(:H)関連でもう一個例があった気がしますね。それは確かマトリックス状データについて各行ごとの最大値インデックス探す方法だったと思います。でも細部までは覚えてないし、その辺また別機会に触れることになると思います……

Reference Articles


Yann LeCun

Expert

Related Discussions

❖ Related Articles